commit 74afbad9a83eebea8d96fb990b2715239731b231 Author: MSVstudios <98731643+MSVstudios@users.noreply.github.com> Date: Mon Nov 17 09:02:53 2025 +0100 Initial release of V02 Add SQL queries for file record analysis and S3 utility functions - Introduced SQL queries to identify records with specific file types, including H264 variants, non-FILE audio and video files, and non-image digital files. - Added aggregate queries to count unique base records per file type. - Implemented S3 utility functions for file operations, including uploading, downloading, and checking file existence. - Enhanced error handling and logging throughout the S3 file processing workflow. - Updated requirements.txt with necessary dependencies for S3 and database interactions. - Created utility functions for media validation, focusing on video and audio file checks. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c1091d1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +.venv/ +.env +.vscode/ +__pycache__/ +*.pyc +*.pyo +logs/ +*.logs +*.log \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..9787790 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,40 @@ +# Use the official Python 3.11 image from the Docker Hub +FROM python:3.11-slim + +# Set the working directory inside the container +WORKDIR /app + +# Copy the requirements file into the container +COPY ./requirements.txt . + +# Install the required Python packages +RUN pip install --no-cache-dir -r requirements.txt + +# Copy the .env file into the /app directory +# COPY ./.env /app/.env + +# Copy the rest of the application code into the container +COPY . . + +RUN chmod +x /app/cron_launch.sh + +# Install cron +RUN apt-get update && apt-get install -y cron + +# Add the cron job +# evry 10 min +# RUN echo "*/10 * * * * /usr/local/bin/python /app/main.py >> /var/log/cron.log 2>&1" > /etc/cron.d/your_cron_job +# 1 AM +RUN echo "0 1 * * * /bin/bash /app/cron_launch.sh >> /var/log/cron.log 2>&1" > /etc/cron.d/your_cron_job + +# Give execution rights on the cron job +RUN chmod 0644 /etc/cron.d/your_cron_job + +# Apply the cron job +RUN crontab /etc/cron.d/your_cron_job + +# Create the log file to be able to run tail +RUN touch /var/log/cron.log + +# Run the command on container startup +CMD cron && tail -f /var/log/cron.log \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..39a851b --- /dev/null +++ b/README.md @@ -0,0 +1,138 @@ +# Project Setup + +## Setting up a Virtual Environment + +1. **Create a virtual environment:** + + ### For Linux/macOS: + ```bash + python3 -m venv .venv + ``` + + ### For Windows: + ```bash + ## ACH-server-import-media + + This repository contains a script to import media files from an S3-compatible bucket into a database. It supports both local execution (virtual environment) and Docker-based deployment via `docker-compose`. + + Contents + - `main.py` - main import script + - `docker-compose.yml` - docker-compose service for running the importer in a container + - `requirements.txt` - Python dependencies + - `config.py`, `.env` - configuration and environment variables + + Prerequisites + - Docker & Docker Compose (or Docker Desktop) + - Python 3.8+ + - Git (optional) + + Quick local setup (virtual environment) + + Linux / macOS + ```bash + python3 -m venv .venv + source .venv/bin/activate + pip install -r requirements.txt + ``` + + Windows (PowerShell) + ```powershell + python -m venv .venv + . .venv\Scripts\Activate.ps1 + pip install -r requirements.txt + ``` + + Running locally + 1. Ensure your configuration is available (see `config.py` or provide a `.env` file with the environment variables used by the project). + 2. Run the script (from the project root): + + Linux / macOS + ```bash + python main.py + ``` + + Windows (PowerShell) + ```powershell + & .venv\Scripts\python.exe main.py + ``` + + Docker Compose + + This project includes a `docker-compose.yml` with a service named `app` (container name `ACH_server_media_importer`). The compose file reads environment variables from `.env` and mounts a `logs` named volume. + + Build and run (detached) + ```powershell + # Docker Compose v2 syntax (recommended) + # From the repository root + + docker compose up -d --build + + # OR if your environment uses the v1 binary + # docker-compose up -d --build + ``` + + Show logs + ```powershell + # Follow logs for the 'app' service + docker compose logs -f app + + # Or use the container name + docker logs -f ACH_server_media_importer + ``` + + Stop / start / down + ```powershell + # Stop containers + docker compose stop + + # Start again + docker compose start + + # Take down containers and network + docker compose down + ``` + + Rebuild when already running + + There are two safe, common ways to rebuild a service when the containers are already running: + + 1) Rebuild in-place and recreate changed containers (recommended for most changes): + + ```powershell + # Rebuild images and recreate services in the background + docker compose up -d --build + ``` + + This tells Compose to rebuild the image(s) and recreate containers for services whose image or configuration changed. + + 2) Full clean rebuild (use when you need to remove volumes or ensure a clean state): + + ```powershell + # Stop and remove containers, networks, and optionally volumes & images, then rebuild + docker compose down --volumes --rmi local + docker compose up -d --build + ``` + + Notes + - `docker compose up -d --build` will recreate containers for services that need updating; it does not destroy named volumes unless you pass `--volumes` to `down`. + - If you need to execute a shell inside the running container: + + ```powershell + # run a shell inside the 'app' service + docker compose exec app /bin/sh + # or (if bash is available) + docker compose exec app /bin/bash + ``` + + Environment and configuration + - Provide sensitive values via a `.env` file (the `docker-compose.yml` already references `.env`). + - Typical variables: `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_REGION`, `BUCKET_NAME`, `DB_HOST`, `DB_NAME`, `DB_USER`, `SMTP_SERVER`, etc. + + Troubleshooting + - If Compose fails to pick up code changes, ensure your local Dockerfile `COPY` commands include the source files and that `docker compose up -d --build` is run from the repository root. + - Use `docker compose logs -f app` to inspect runtime errors. + + If you'd like, I can add a short `Makefile` or a PowerShell script to wrap the common Docker Compose commands (build, rebuild, logs) for convenience. + + --- + Edited to add clear docker-compose rebuild and run instructions. diff --git a/build.sh b/build.sh new file mode 100644 index 0000000..04eb29f --- /dev/null +++ b/build.sh @@ -0,0 +1 @@ +docker-compose down && docker-compose build && docker-compose up -d \ No newline at end of file diff --git a/config.py b/config.py new file mode 100644 index 0000000..739f200 --- /dev/null +++ b/config.py @@ -0,0 +1,70 @@ +import os +from dotenv import load_dotenv +# import logging +import json + +def load_config(): + """ + Loads configuration from environment variables. + """ + + # Load environment variables from .env file + load_dotenv() + + # Define configuration dictionaries + aws_config = { + 'aws_access_key_id': os.getenv('AWS_ACCESS_KEY_ID', ''), + 'aws_secret_access_key': os.getenv('AWS_SECRET_ACCESS_KEY', ''), + 'region_name': os.getenv('AWS_REGION', ''), + 'endpoint_url': os.getenv('AWS_ENDPOINT_URL', ''), + } + + db_config = { + 'host': os.getenv('DB_HOST', ''), + 'database': os.getenv('DB_NAME', ''), + 'user': os.getenv('DB_USER', ''), + 'password': os.getenv('DB_PASSWORD', ''), + 'port': os.getenv('DB_PORT', ''), + } + + ach_config = { + 'ach_editor_id': int(os.getenv('ACH_EDITOR_ID', '0')), + 'ach_approver_id': int(os.getenv('ACH_APPROVER_ID', '0')), + 'ach_notes': os.getenv('ACH_NOTES', ''), + 'ach_storage_location': json.loads(os.getenv('ACH_STORAGE_LOCATION', '{}')), + 'ach_file_type': json.loads(os.getenv('ACH_FILE_TYPE', '{}')), # unused + } + + # Define ach_variables dictionary - consider moving these values to a separate configuration file (e.g., ach_variables.json) + ach_variables = { + 'custom_data_in': {}, + 'disk_size': 0, + 'media_disk_size': 0, + 'pdf_disk_size': 0, + 'extension': '.', + 'conservative_copy_extension': '.', + 'file_fullpath': '', + 'objectKeys': { + 'media': None, + 'pdf': None, + 'conservative_copy': None, + }, + 'inventory_code': '', + } + + bucket_name = os.getenv('BUCKET_NAME', 'artchive-dev') + + # Load configurations from environment variables + # (No need to create namedtuple instances here) + + # Log configuration loading status + ''' logging.info(f'AWS config loaded: {aws_config}') + logging.info(f'DB config loaded: {db_config}') + logging.info(f'ACH config loaded: {ach_config}') + logging.info(f'ACH variables loaded: {ach_variables}') + logging.info(f'Bucket name loaded: {bucket_name}')''' + + return aws_config, db_config, ach_config, bucket_name, ach_variables + + +# Consider using a class for a more structured approach (optional) \ No newline at end of file diff --git a/countfiles.py b/countfiles.py new file mode 100644 index 0000000..cd1af99 --- /dev/null +++ b/countfiles.py @@ -0,0 +1,219 @@ +# v20251103 - Main script to import media files from S3 to the database +import logging +import time +from datetime import datetime +import pytz +import os +from logging_config import setup_logging, CUSTOM_ERROR_LEVEL +from email_utils import handle_error, send_email_with_attachment +from s3_utils import create_s3_client, list_s3_bucket, parse_s3_files +from error_handler import handle_general_error, handle_file_not_found_error, handle_value_error +from file_utils import is_file_empty +from db_utils import count_files, get_distinct_filenames_from_db +from dotenv import load_dotenv +import config +import psycopg2 + +load_dotenv() + +# MAIN PROCESS +def main_process(aws_config, db_config, ach_config, bucket_name, ach_variables): + # import global variables + #from config import load_config, aws_config, db_config, ach_config, bucket_name + #global aws_config, db_config, ach_config, bucket_name + #config import load_config , aws_config, db_config, ach_config, bucket_name + #load_config() + + logging.info(f"bucket_name: {bucket_name}") + + # Ensure timing variables are always defined so later error-email logic + # won't fail if an exception is raised before end_time/elapsed_time is set. + start_time = time.time() + end_time = start_time + elapsed_time = 0.0 + + try: + logging.info("Starting the main process...") + + # Create the S3 client + s3_client = create_s3_client(aws_config) + + # List S3 bucket contents + contents = list_s3_bucket(s3_client, bucket_name) + + # Define valid extensions and excluded folders + valid_extensions = {'.mp3', '.mp4', '.md5', '.json', '.pdf'} + excluded_folders = {'DOCUMENTAZIONE_FOTOGRAFICA/', 'TEST-FOLDER-DEV/', 'FILE/'} + + # Extract and filter file names + s3_file_names = [ + content['Key'] for content in contents + if any(content['Key'].endswith(ext) for ext in valid_extensions) and + not any(content['Key'].startswith(folder) for folder in excluded_folders) + ] + + s3_only_mp4_file_names = [ + content['Key'] for content in contents + if content['Key'].endswith('.mp4') and + not any(content['Key'].startswith(folder) for folder in excluded_folders) + ] + + total_file_s3mp4 = len(s3_only_mp4_file_names) + logging.info(f"Total number of distinct .mp4 files in the S3 bucket before import: {total_file_s3mp4}") + + # filter_s3_files_not_in_db + # --- Get all DB filenames in one call --- + db_file_names = get_distinct_filenames_from_db() + + # --- Keep only those not in DB --- + file_names = [f for f in s3_file_names if f not in db_file_names] + + # Print the total number of files + total_file_db = len(db_file_names) + logging.info(f"Total number of distinct files in the database before import: {total_file_db}") + total_files_s3 = len(s3_file_names) + logging.info(f"Total number of the valid (mp3,mp4,md5,json,pdf) files in the S3 bucket before DB filter: {total_files_s3}") + total_files = len(file_names) + logging.info(f"Total number of the valid (mp3,mp4,md5,json,pdf) files after DB filter: {total_files}") + + # Count files with .mp4 and .mp3 extensions + mp4_count = sum(1 for file in s3_file_names if file.endswith('.mp4')) + mp3_count = sum(1 for file in s3_file_names if file.endswith('.mp3')) + md5_count = sum(1 for file in s3_file_names if file.endswith('.md5')) + pdf_count = sum(1 for file in s3_file_names if file.endswith('.pdf')) + json_count = sum(1 for file in s3_file_names if file.endswith('.json')) + mov_count = sum(1 for file in s3_file_names if file.endswith('.mov')) + # jpg_count = sum(1 for file in file_names if file.endswith('.jpg')) + # file directory + avi_count = sum(1 for file in s3_file_names if file.endswith('.avi')) + m4v_count = sum(1 for file in s3_file_names if file.endswith('.m4v')) + # Log the counts + # Get the logger instance + logger = logging.getLogger() + # Use the logger instance to log custom info + logging.warning("Number of .mp4 files on S3 bucket (%s): %s", bucket_name, mp4_count) + logging.warning("Number of .mp3 files on S3 bucket (%s): %s", bucket_name, mp3_count) + logging.warning("Number of .md5 files on S3 bucket (%s): %s", bucket_name, md5_count) + logging.warning("Number of .pdf files on S3 bucket (%s): %s", bucket_name, pdf_count) + logging.warning("Number of .json files on S3 bucket (%s): %s", bucket_name, json_count) + logging.warning("Number of .mov files on S3 bucket (%s): %s", bucket_name, mov_count) + if mp4_count != pdf_count: + logging.error("Number of .mp4 files is not equal to number of .pdf files") + logging.error("Abort Import Process due to missing files") + # return + if mp3_count + mp4_count != json_count: + logging.error("Number of .mp3 files + number of .mp4 files is not equal to number of .json files") + logging.error("Abort Import Process due to missing files") + # return + if mp3_count + mp4_count != md5_count: + logging.error("Number of .mp3 files + number of .mp4 files is not equal to number of .md5 files") + logging.error("Abort Import Process due to missing files") + # return + + # Try to parse S3 files + try: + # if DRY RUN is set to True, the files will not be uploaded to the database + + logging.warning("DRY RUN is set to TRUE - No files will be added to the database") + # set the tuples to zero + uploaded_files_count, warning_files_count, error_files_count = (0, 0, 0) + + logging.warning("Total number of files (mp3+mp4) with warnings: %s. (Probably already existing in the DB)", warning_files_count) + logging.warning("Total number of files with errors: %s", error_files_count) + logging.warning("Total number of files uploaded: %s", uploaded_files_count) + logging.warning("All files parsed") + except Exception as e: + logging.error(f"An error occurred while parsing S3 files: {e}") + handle_general_error(e) + + # Check results + # connect to database + conn = psycopg2.connect(**db_config) + cur = conn.cursor() + # function count_files that are wav and mov in db + # Map file extensions (include leading dot) to mime types + EXTENSION_MIME_MAP = { + '.avi': 'video/x-msvideo', + '.mov': 'video/mov', + '.wav': 'audio/wav', + '.mp4': 'video/mp4', + '.m4v': 'video/mp4', + '.mp3': 'audio/mp3', + '.mxf': 'application/mxf', + '.mpg': 'video/mpeg', + } + + # populate mime_type list with all relevant MediaInfo/MIME values + mime_type = [ + 'video/x-msvideo', # .avi + 'video/mov', # .mov + 'audio/wav', # .wav + 'video/mp4', # .mp4, .m4v + 'audio/mp3', # .mp3 + 'application/mxf', # .mxf + 'video/mpeg', # .mpg + ] + + logging.info(f"Mime types for counting files: {mime_type}") + + all_files_on_db = count_files(cur, mime_type,'*', False) + mov_files_on_db = count_files(cur,['video/mov'],'.mov', False ) + mxf_files_on_db = count_files(cur,['application/mxf'],'.mxf', False ) + mpg_files_on_db = count_files(cur,['video/mpeg'],'.mpg', False ) + avi_files_on_db = count_files(cur,['video/x-msvideo'],'.avi', False ) + m4v_files_on_db = count_files(cur,['video/mp4'],'.m4v', False ) + mp4_files_on_db = count_files(cur,['video/mp4'],'.mp4', False ) + wav_files_on_db = count_files(cur,['audio/wav'],'.wav', False ) + mp3_files_on_db = count_files(cur,['audio/mp3'],'.mp3', False ) + + # mov + m4v + avi + mxf + mpg + logging.warning(f"Number of all video files in the database: {all_files_on_db}") + logging.warning(f"Number of .mov files in the database: {mov_files_on_db} and S3: {mov_count} ") + logging.warning(f"Number of .mp4 files in the database: {mp4_files_on_db} and S3: {mp4_count}") + + # compare the mp4 name and s3 name and report the missing files in the 2 lists a print the list + missing_mp4s = [f for f in file_names if f.endswith('.mp4') and f not in db_file_names] + logging.warning(f"Missing .mp4 files in DB compared to S3: {missing_mp4s}") + + + logging.warning(f"Number of .wav files in the database: {wav_files_on_db} ") + logging.warning(f"Number of .mp3 files in the database: {mp3_files_on_db} and S3: {mp3_count}") + logging.warning(f"Number of .avi files in the database: {avi_files_on_db} ") + logging.warning(f"Number of .m4v files in the database: {m4v_files_on_db} ") + logging.warning(f"Number of .mxf files in the database: {mxf_files_on_db} ") + logging.warning(f"Number of .mpg files in the database: {mpg_files_on_db} ") + + logging.warning(f"Total file in s3 before import {total_files}") + + # time elapsed + end_time = time.time() # Record end time + + elapsed_time = end_time - start_time + logging.warning(f"Processing completed. Time taken: {elapsed_time:.2f} seconds") + + + + except Exception as e: + handle_general_error(e) + except FileNotFoundError as e: + handle_file_not_found_error(e) + except ValueError as e: + handle_value_error(e) + + +if __name__ == "__main__": + try: + # Setup logging using standard TimedRotatingFileHandler handlers. + # Rely on the handler's built-in rotation; don't call doRollover manually. + logger, rotating_handler, error_handler, warning_handler = setup_logging() + + # Load configuration settings + aws_config, db_config, ach_config, bucket_name, ach_variables = config.load_config() + + logging.info("Config loaded, and logging setup") + + # Run the main process + main_process(aws_config, db_config, ach_config, bucket_name, ach_variables) + + except Exception as e: + logging.error(f"An error occurred: {e}") \ No newline at end of file diff --git a/cron_launch.sh b/cron_launch.sh new file mode 100644 index 0000000..51d8e62 --- /dev/null +++ b/cron_launch.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +# Set the working directory +cd /app + +# Source the environment variables +set -a +[ -f /app/.env ] && source /app/.env +set +a + +# Run the Python script +/usr/local/bin/python /app/main.py >> /var/log/cron.log 2>&1 \ No newline at end of file diff --git a/db_utils.py b/db_utils.py new file mode 100644 index 0000000..8ed3bd4 --- /dev/null +++ b/db_utils.py @@ -0,0 +1,618 @@ +""" db utils.py +Description: + This module provides utility functions for interacting with a PostgreSQL database using psycopg2. + It includes functions for checking inventory codes, adding file records and relationships, + retrieving support IDs, and executing queries. +Functions: + check_inventory_in_db(s3_client, cur, base_name): + Checks if the inventory code exists in the database and validates its format. + check_objkey_in_file_db(cur, base_name): + Checks if the object key exists in the database. + add_file_record_and_relationship(s3_client, cur, base_name, ach_variables): + Adds a file record and its relationships to the database and uploads a log file to S3. + add_file_record(cur, editor_id, approver_id, disk_size, file_availability_dict, notes, base_name, extension, storage_location, file_type, custom_data_in=None): + Adds a file record to the database. + add_file_support_relationship(cur, editor_id, approver_id, file_id, support_id, status): + Adds a relationship between a file and a support record in the database. + add_file_relationship(cur, file_a_id, file_b_id, file_relation_dict, custom_data, status, editor_id, approver_id): + Adds a relationship between two files in the database. + retrieve_support_id(cur, inventory_code): + Retrieves the support ID for a given inventory code from the database. + retrieve_digital_file_names(s3_client, cur, base_name, digital_file_name_in): + Retrieves digital file names from the database that match the given base_name. + get_db_connection(db_config): + Establishes a connection to the PostgreSQL database using the provided configuration. + execute_query(conn, query, params=None): + Executes a SQL query on the database. +""" +import psycopg2 +from psycopg2 import sql +import logging +from datetime import datetime +import re +from email_utils import handle_error +import json +import os +import config + +# Map file extensions (include leading dot) to mime types +EXTENSION_MIME_MAP = { + '.avi': 'video/x-msvideo', + '.mov': 'video/mov', + '.wav': 'audio/wav', + '.mp4': 'video/mp4', + '.m4v': 'video/mp4', + '.mp3': 'audio/mp3', + '.mxf': 'application/mxf', + '.mpg': 'video/mpeg', +} + +def get_mime_for_extension(extension: str) -> str: + """Return the mime type for an extension. Accepts with or without leading dot. + + Falls back to 'application/octet-stream' when unknown. + """ + if not extension: + return 'application/octet-stream' + if not extension.startswith('.'): + extension = f'.{extension}' + return EXTENSION_MIME_MAP.get(extension.lower(), 'application/octet-stream') + +def get_distinct_filenames_from_db(): + """Retrieve distinct digital file names from the Postgres DB. + + This helper loads DB configuration via `config.load_config()` and then + opens a connection with `get_db_connection(db_config)`. + """ + # load db_config from project config helper (aws_config, db_config, ach_config, bucket_name, ach_variables) + try: + _, db_config, _, _, _ = config.load_config() + except Exception: + # If config.load_config isn't available or fails, re-raise with a clearer message + raise RuntimeError("Unable to load DB configuration via config.load_config()") + + conn = get_db_connection(db_config) + try: + with conn.cursor() as cur: + # cur.execute("SELECT DISTINCT digital_file_name FROM file;") + cur.execute( + """SELECT DISTINCT digital_file_name + FROM file + WHERE digital_file_name ~ '\\.(mp3|mp4|md5|json|pdf)$';""" + ) + rows = cur.fetchall() + # Flatten list of tuples -> simple set of names + return {row[0] for row in rows if row[0] is not None} + finally: + conn.close() + +# Function to check if the inventory code exists in the database +def check_inventory_in_db(s3_client, cur, base_name): + logging.debug("Executing check_inventory_in_db") + # Load the configuration from the .env file + # aws_config, db_config, ach_config, bucket_name, ach_variables = config.load_config() + + # Define the pattern for the inventory code + media_tipology_A = ['MCC', 'OA4', 'DAT'] + # TODO add other tipologies: AVI, M4V, MOV, MP4, MXF, MPG (done 04112025) + media_tipology_V = [ + 'OV1', 'OV2', 'UMT', 'VHS', 'HI8', 'VD8', 'BTC', 'DBT', 'IMX', 'DVD', + 'CDR', 'MDV', 'DVC', 'HDC', 'BRD', 'CDV', + 'AVI', 'M4V', 'MOV', 'MP4', 'MXF', 'MPG' # add for "file" folders 04112025 + ] + + # list of known mime types (derived from EXTENSION_MIME_MAP) + mime_type = list({v for v in EXTENSION_MIME_MAP.values()}) + + try: + logging.info(f"SUPPORT TYPOLOGY : {base_name[3:6]}") + if base_name[3:6] == 'OA4': + pattern = r'^[VA][OC]-[A-Z0-9]{3}-\d{5}_\d{2}$' # include the _\d{2} for OA4 + truncated_base_name = base_name[:15] + logging.info(f"type is OA4: {truncated_base_name}") + elif base_name[3:6] == 'MCC': + pattern = r'^[VA][OC]-[A-Z0-9]{3}-\d{5}_[AB]$' # include the _[AB] for MCC + truncated_base_name = base_name[:14] + logging.info(f"type is MCC: {truncated_base_name}") + else: + # Check the base_name format with regex pattern first + pattern = r'^[VA][OC]-[A-Z0-9]{3}-\d{5}$' + truncated_base_name = base_name[:12] + logging.info(f"type is default: {truncated_base_name}") + + logging.info(f"Checking inventory code {truncated_base_name} with pattern {pattern}...") + # Validate the string + try: + if not re.match(pattern, truncated_base_name): + error_message = f"Invalid format for base_name {truncated_base_name}" + logging.error(error_message) + raise ValueError(error_message) # Create and raise the exception + else : + # Extract the first character and the 3 central characters + first_char = truncated_base_name[0] + central_chars = truncated_base_name[3:6] + + # Check the corresponding list based on the first character + if first_char == 'A': # Check the corresponding list based on the first character + if central_chars not in media_tipology_A: + logging.error(f"Invalid media tipology for base_name {truncated_base_name}") + return False, None + elif first_char == 'V': # Check the corresponding list based on the first character + if central_chars not in media_tipology_V: + logging.error(f"Invalid media tipology for base_name {truncated_base_name}") + return False, None + else: # Invalid first character + logging.error(f"Invalid first character for base_name {truncated_base_name}") + return False, None + + logging.info(f"Valid format for base_name {truncated_base_name}") + except ValueError as e: + # Handle the specific ValueError exception + logging.error(f"Caught a ValueError: {e}") + # Optionally, take other actions or clean up + return False, None + except Exception as e: + # Handle any other exceptions + logging.error(f"Caught an unexpected exception: {e}") + # Optionally, take other actions or clean up + return False, None + + # First query: Check if the truncated base_name matches an inventory code in the support table + check_query = sql.SQL(""" + SELECT 1 + FROM support + WHERE inventory_code LIKE %s + LIMIT 1; + """) + cur.execute(check_query, (f"{truncated_base_name[:12]}%",)) + result = cur.fetchone() + + if result: + logging.info(f"Inventory code {truncated_base_name[:12]} found in the database.") + # Call the function to retrieve digital file names, assuming this function is implemented + return True, truncated_base_name + else: + logging.info(f"Inventory code {truncated_base_name} not found in the database.") + handle_error(f"Inventory code {truncated_base_name} not found in the database.") + #raise ValueError(f"Inventory code {truncated_base_name} not found in the database.") + return False, None + + except Exception as e: + logging.error(f'Error checking inventory code {base_name}:', {e}) + raise e + +# Function to check if the object key exists in the database +def check_objkey_in_file_db(cur, base_name): + """ + Checks if the base_name matches digital_file_name in the file table. + + Args: + cur (cursor): The database cursor. + base_name (str): The base name to check in the database. + + Returns: + tuple: A tuple containing a boolean indicating if the base_name was found and the base_name itself or None. + """ + logging.debug("Executing check_objkey_in_file_db") + + try: + # First query: Check if the base_name matches digital_file_name in the file table + check_query = sql.SQL(""" + SELECT 1 + FROM file + WHERE digital_file_name LIKE %s + LIMIT 1; + """) + cur.execute(check_query, (f"{base_name}%",)) + result = cur.fetchone() + + if result: + logging.info(f"Inventory code {base_name} found in the database.") + # Call the function to retrieve digital file names, assuming this function is implemented + return True + else: + logging.info(f"Inventory code {base_name} not found in the database.") + return False + + except Exception as e: + logging.error(f"Error checking inventory code {base_name}: {e}") + raise e + +# Function to add a file record and its relationship to the support record +def add_file_record_and_relationship(s3_client, cur, base_name,ach_variables): + """ + Adds a file record and its relationships to the database and uploads a log file to S3. + This function performs the following steps: + 1. Loads configuration from the .env file. + 2. Retrieves the support ID for the given base name. + 3. Adds a new file record for the conservative copy. + 4. Adds a relationship between the new file and the support ID. + 5. If the file extension is .mp4 or .mp3, adds a new MP4/MP3 file record and its relationships. + 6. If the file extension is .mp4 and a PDF exists, adds a PDF file record and its relationship to the master file. + 7. Uploads a log file to S3 if all operations are successful. + Args: + s3_client (boto3.client): The S3 client used to upload the log file. + cur (psycopg2.cursor): The database cursor used to execute SQL queries. + base_name (str): The base name of the file. + ach_variables (dict): A dictionary containing various variables and configurations needed for the operation. + Returns: + bool: True if the operation is successful, False otherwise. + Raises: + Exception: If any error occurs during the operation, it is logged and re-raised. + """ + # Load the configuration from the .env file + aws_config, db_config, ach_config, bucket_name, _ = config.load_config() + + editor_id = ach_config['ach_editor_id'] + approver_id = ach_config['ach_approver_id'] + # Append current date and time to notes in format: yyyy mm dd HH MM SS + now_dt = datetime.now() + date_part = now_dt.strftime('%Y %m %d') + time_part = now_dt.strftime('%H %M %S') + notes = f"{ach_config.get('ach_notes','') } {date_part} {time_part}" + + ach_variables['file_copia_conservativa'] = ach_variables['custom_data_in'].get('mediainfo', {}).get("media", {}).get("@ref", "") + logging.info(f"ach_variables['file_copia_conservativa']a: {ach_variables['file_copia_conservativa']}") + + logging.debug("Executing add_file_record_and_relationship") + + try: + # Retrieve the support ID for the given base name + support_id = retrieve_support_id(cur, ach_variables['inventory_code']) + if support_id: + logging.info(f"Found support_id {support_id} for base_name {base_name}.") + + ach_variables['objectKeys']['conservative_copy'] = ach_variables['file_copia_conservativa'] # must remove _H264 + # replace in ach_variables['file_copia_conservativa'] _H264 with empty string + ach_variables['objectKeys']['conservative_copy'] = ach_variables['objectKeys']['conservative_copy'].replace('_H264', '') + + # Add a new file record and get the new file ID + file_availability_dict = 7 # Place Holder + # add a new file record for the "copia conservativa" + ach_variables['custom_data_in']['media_usage'] = 'master' # can be "copia conservativa" + # determine master mime type from the file extension + master_mime_type = get_mime_for_extension(ach_variables.get('extension')) + + new_file_id = add_file_record( + cur, + editor_id, + approver_id, + ach_variables['disk_size'], + file_availability_dict, + notes, + ach_variables['objectKeys']['conservative_copy'], + ach_variables['conservative_copy_extension'], + ach_config['ach_storage_location'], + master_mime_type, + ach_variables['custom_data_in'] + ) + + if new_file_id: + logging.info(f"Added file record for {base_name} with file_id {new_file_id}.") + # Add a relationship between the new file and the support ID + status = '{"saved": true, "status": "approved"}' # Define the status JSON + add_file_support_relationship(cur, editor_id, approver_id, new_file_id, support_id, status) + + # If the file extension is .mp4, add a new MP4/MP3 file record + mime_type = get_mime_for_extension(ach_variables.get('extension')) + if ach_variables['extension'] == '.mp4' or ach_variables['extension'] == '.mp3': + file_availability_dict = 8 # Hot Storage + mp4_file_id = add_file_record( + cur, + editor_id, + approver_id, + ach_variables['media_disk_size'], + file_availability_dict, + notes, + ach_variables['objectKeys']['media'], + ach_variables['extension'], # deprecated + {"storage_type": "s3", "storage_location_id": 5}, + mime_type, + {"media_usage": "streaming"} + ) + if mp4_file_id: + logging.info(f"Added MP4/MP3 file record for {base_name} with file_id {mp4_file_id}.") + # Add a relationship between the MP4 file and the support ID + status = '{"saved": true, "status": "approved"}' # Define the status JSON + add_file_support_relationship(cur, editor_id, approver_id, mp4_file_id, support_id, status) + + # Add a relationship between the streming file(mp4_file_id) and the master file(new_file_id) + status = '{"saved": true, "status": "approved"}' # Define the status JSON + file_relation_dict = 10 # Define the relationship dictionary: È re encoding di master + add_file_relationship(cur, new_file_id, mp4_file_id, file_relation_dict, '{}', status, editor_id, approver_id) + + # the .mp4 should also have the QC in pdf format: add file as PDF relation with file MASTER as documentation + if ach_variables['extension'] == '.mp4' and ach_variables['pdf_disk_size'] > 0: + file_availability_dict = 8 # Hot Storage + pdf_file_id = add_file_record( + cur, + editor_id, + approver_id, + ach_variables['pdf_disk_size'], + file_availability_dict, + notes, + ach_variables['objectKeys']['pdf'], + '.pdf', + {"storage_type": "s3", "storage_location_id": 5}, + "application/pdf", + {"media_usage": "documentation"} + ) + if pdf_file_id: + logging.info(f"Added PDF file record for {base_name} with file_id {pdf_file_id}.") + # Add a relationship between the PDF file and the support ID + # If both MP4 and PDF file IDs exist, add a relationship between them + if ach_variables['extension'] == '.mp4' and pdf_file_id: + file_relation_dict = 11 # Define the relationship dictionary e documentazione di master + custom_data = '{}' # Define any additional custom data if needed + status = '{"saved": true, "status": "approved"}' # Define the status + add_file_relationship(cur, new_file_id, pdf_file_id, file_relation_dict, '{}', status, + editor_id, approver_id) + + # If everything is successful, upload the log file to S3 + # log_file_path = ach_variables['file_fullpath'] + base_name + '.log' + # logging.info(f"Uploading log file {log_file_path} to S3...") + # log_data = "import successful" + # log_to_s3(s3_client, bucket_name, log_file_path, log_data) + # logging.info(f"Log file {log_file_path} uploaded to S3.") + return True + else: + logging.error(f"No support_id found for base_name {base_name}.") + return False + + except Exception as e: + logging.error(f'Error adding file record and relationship: {e}') + raise e + +# Functio to add a file record +def add_file_record(cur, editor_id, approver_id, disk_size, file_availability_dict, notes, base_name, extension,storage_location, file_type, custom_data_in=None): + try: + cur.execute(""" + SELECT public.add_file02( + %s, -- editor_id_in + %s, -- approver_id_in + %s, -- disk_size_in + %s, -- file_availability_dict_in + %s, -- notes_in + %s, -- digital_file_name_in + %s, -- original_file_name_in + %s, -- status_in + %s, -- storage_location_in + %s, -- file_type_in + %s -- custom_data_in + ) + """, ( + editor_id, + approver_id, + disk_size, + file_availability_dict, # New parameter + notes, + f"{base_name}", # Digital_file_name_in + f"{base_name}", # Original file name + '{"saved": true, "status": "approved"}', # Status + json.dumps(storage_location), # Storage location + json.dumps({"type": file_type}), # File type + json.dumps(custom_data_in) # Custom data + )) + + # Fetch the result returned by the function + file_result = cur.fetchone() + return file_result[0] if file_result else None + + except Exception as e: + logging.error(f'Error adding file record:', e) + raise e + +# Function to ad a file support relationship +def add_file_support_relationship(cur, editor_id, approver_id, file_id, support_id, status): + try: + # Call the stored procedure using SQL CALL statement with named parameters + cur.execute(""" + CALL public.add_rel_file_support( + editor_id_in := %s, + approver_id_in := %s, + file_id_in := %s, + support_id_in := %s, + status_in := %s + ) + """, ( + editor_id, # editor_id_in + approver_id, # approver_id_in + file_id, # file_id_in + support_id, # support_id_in + status # status_in + )) + + # Since the procedure does not return any results, just #print a confirmation message + logging.info(f"Added file support relationship for file_id {file_id}.") + + except Exception as e: + logging.error(f'Error adding file support relationship:', e) + raise e + +# Function to add file to file relationship +def add_file_relationship(cur, file_a_id, file_b_id, file_relation_dict, custom_data, status, editor_id, approver_id): + try: + # Call the stored procedure using SQL CALL statement with positional parameters + cur.execute(""" + CALL public.add_rel_file( + editor_id_in := %s, + approver_id_in := %s, + file_a_id_in := %s, + file_b_id_in := %s, + file_relation_dict_in := %s, + status_in := %s, + custom_data_in := %s + ) + """, ( + editor_id, # editor_id_in + approver_id, # approver_id_in + file_a_id, # file_a_id_in + file_b_id, # file_b_id_in + file_relation_dict, # file_relation_dict_in + status, # status_in + custom_data # custom_data_in + )) + + # Since the procedure does not return a result, just #print a confirmation message + logging.info(f"Added file relationship between file_id {file_a_id} and file_id {file_b_id}.") + except Exception as e: + logging.error(f'Error adding file relationship: {e}', exc_info=True) + raise e + +# Function to retrieve the support ID for a given inventory code +def retrieve_support_id(cur, inventory_code): + try: + cur.execute(""" + SELECT MAX(s.id) AS id + FROM support s + WHERE s.inventory_code LIKE %s::text + AND (s.support_type ->> 'type' LIKE 'video' OR s.support_type ->> 'type' LIKE 'audio') + GROUP BY s.h_base_record_id + """, (f"{inventory_code}%",)) + + support_result = cur.fetchone() + logging.info(f"support_result: {support_result[0] if support_result and support_result[0] else None}") + return support_result[0] if support_result and support_result[0] else None + + except Exception as e: + logging.error(f'Error retrieving support_id:', e) + raise e + +# Function to retrieve digital_file_name from the database +def retrieve_digital_file_names(s3_client, cur, base_name,digital_file_name_in): + try: + logging.info(f"Retrieving digital file names for inventory code {base_name}... and digital_file_name {digital_file_name_in}") + # Define the query to retrieve digital_file_name + query = sql.SQL(""" + WITH supprtId AS ( + SELECT s.id + FROM support s + WHERE s.inventory_code LIKE %s + ), rel_fs AS ( + SELECT rfs.file_id + FROM rel_file_support rfs + WHERE rfs.support_id IN (SELECT id FROM supprtId) + ) + SELECT f.id AS file_id, f.digital_file_name + FROM file f + WHERE f.id IN (SELECT file_id FROM rel_fs) + OR f.digital_file_name ILIKE %s + """) + + # Execute the query + cur.execute(query, (f"{base_name[:12]}", f"{digital_file_name_in}.%")) + results = cur.fetchall() + + # Process results (for example, printing) + for result in results: + logging.info(f"File ID: {result[0]}, Digital File Name: {result[1]}") + + + # Filter results to match the base_name (without extension) + matching_files = [] + for row in results: + digital_file_name = row[1] # Second column: digital_file_name + # Extract base name without extension + # base_name_from_file = os.path.splitext(digital_file_name)[0] + base_name_from_file = os.path.splitext(os.path.basename(digital_file_name))[0] + logging.info(f"base_name_from_file: {base_name_from_file}") + # Compare with the provided base_name + if base_name_from_file == os.path.splitext(os.path.basename(digital_file_name_in))[0]: + matching_files.append(digital_file_name) + + logging.info(f"Matching digital file names: {matching_files}") + if matching_files: + logging.info(f"Found the following matching digital file names: {matching_files} do not add") + return False + else: + logging.info(f"No matching digital file names found for inventory code {base_name}. try to add new record") + # Call the function to add record and relationship + # uncomment the above line to add the record and relationship + return True + + except Exception as e: + logging.error(f'Error retrieving digital file names for inventory code {base_name}:', e) + raise e + +# Fuction to get db connection +def get_db_connection(db_config): + try: + conn = psycopg2.connect(**db_config) + return conn + except psycopg2.Error as e: + logging.error(f"Error connecting to the database: {e}") + raise e + +# Function to execute a query +def execute_query(conn, query, params=None): + try: + with conn.cursor() as cur: + cur.execute(query, params) + conn.commit() + except psycopg2.Error as e: + logging.error(f"Error executing query: {e}") + raise e + +#Function count files in the database +def count_files(cur, mime_type, extension='*', file_dir=True): + """ + Function to count files in the database with various filters + """ + try: + # Base query components + query = ( + "SELECT COUNT(DISTINCT h_base_record_id) " + "FROM file " + "WHERE file_type ->> 'type' IS NOT NULL" + ) + + args = () + + # Handle mime_type list: expand into ARRAY[...] with individual placeholders + if mime_type: + if isinstance(mime_type, (list, tuple)): + # Create placeholders for each mime type and append to args + placeholders = ','.join(['%s'] * len(mime_type)) + query += f" AND file_type ->> 'type' = ANY(ARRAY[{placeholders}])" + args += tuple(mime_type) + else: + # single value + query += " AND file_type ->> 'type' = %s" + args += (mime_type,) + + # Add extension condition if not wildcard + if extension != '*' and extension is not None: + query += " AND digital_file_name ILIKE %s" + args = args + (f'%{extension}',) + # If extension is '*', no additional condition is needed (matches any extension) + + # Add file directory condition based on file_dir parameter + if file_dir: + # Only files in directory (original_file_name starts with 'FILE') + query += " AND original_file_name ILIKE %s" + args = args + ('FILE%',) + else: + # Exclude files in directory (original_file_name does not start with 'FILE') + query += " AND original_file_name NOT ILIKE %s" + args = args + ('FILE%',) + + try: + logging.debug("Executing count_files SQL: %s -- args: %s", query, args) + cur.execute(query, args) + result = cur.fetchone() + # fetchone() returns a sequence or None; protect against unexpected empty sequences + if not result: + return 0 + try: + return result[0] + except (IndexError, TypeError) as idx_e: + logging.exception("Unexpected result shape from count query: %s", result) + raise + except Exception as e: + logging.exception('Error executing count_files with query: %s', query) + raise + except Exception as e: + logging.error('Error counting files: %s', e) + raise e + + diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..55bd517 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,26 @@ +services: + app: + build: . + container_name: ACH_server_media_importer + volumes: + - logs:/app/logs # Add this line to map the logs volume + env_file: + - .env + environment: + - AWS_ACCESS_KEY_I=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=${AWS_REGION} + - AWS_ENDPOINT_URL=${AWS_ENDPOINT_URL} + - BUCKET_NAME=${BUCKET_NAME} + - DB_HOST=${DB_HOST} + - DB_NAME=${DB_NAME} + - DB_USER=${DB_USER} + - SMTP_SERVER=${SMTP_SERVER} + - SMTP_PORT=${SMTP_PORT} + - SMTP_USER=${SMTP_USER} + - SMTP_PASSWORD=${SMTP_PASSWORD} + - SENDER_EMAIL=${SENDER_EMAIL} + - EMAIL_RECIPIENTS=${EMAIL_RECIPIENTS} + restart: unless-stopped +volumes: + logs: # Define the named volume \ No newline at end of file diff --git a/email_utils.py b/email_utils.py new file mode 100644 index 0000000..59a9cb5 --- /dev/null +++ b/email_utils.py @@ -0,0 +1,106 @@ +import os +import smtplib +import traceback +import logging +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from email.mime.base import MIMEBase +from email import encoders +from email.utils import formataddr +from dotenv import load_dotenv + +# Load environment variables from .env file +load_dotenv() + +# Email configuration +SMTP_SERVER = os.getenv('SMTP_SERVER') +SMTP_PORT = os.getenv('SMTP_PORT') +SMTP_USER = os.getenv('SMTP_USER') +SMTP_PASSWORD = os.getenv('SMTP_PASSWORD') +SENDER_EMAIL = os.getenv('SENDER_EMAIL') +# Split env recipient lists safely and strip whitespace; default to empty list +def _split_env_list(varname): + raw = os.getenv(varname, '') + return [s.strip() for s in raw.split(',') if s.strip()] + +EMAIL_RECIPIENTS = _split_env_list('EMAIL_RECIPIENTS') +ERROR_EMAIL_RECIPIENTS = _split_env_list('ERROR_EMAIL_RECIPIENTS') +SUCCESS_EMAIL_RECIPIENTS = _split_env_list('SUCCESS_EMAIL_RECIPIENTS') + + +# Send email with attachment +def send_email_with_attachment(subject, body, attachment_path=None, email_recipients=None): + sender_name="Art.c.hive Support for ARKIVO" + try: + # Create a multipart message + msg = MIMEMultipart() + msg['From'] = formataddr((sender_name, SENDER_EMAIL)) + # if email recipent not defined use EMAIL_RECIPIENTS + if email_recipients: + msg['To'] = ', '.join(email_recipients) + else: + msg['To'] = ', '.join(EMAIL_RECIPIENTS) + + msg['Subject'] = subject + + # Attach the body with the msg instance + msg.attach(MIMEText(body, 'plain')) + + # Attach the file if provided + if attachment_path: + if os.path.exists(attachment_path): + with open(attachment_path, "rb") as attachment: + part = MIMEBase('application', 'octet-stream') + part.set_payload(attachment.read()) + encoders.encode_base64(part) + part.add_header('Content-Disposition', f'attachment; filename= {os.path.basename(attachment_path)}') + msg.attach(part) + else: + logging.warning(f"Attachment path {attachment_path} does not exist. Skipping attachment.") + + # Create SMTP session for sending the mail + recipients_list = email_recipients if email_recipients else EMAIL_RECIPIENTS + if isinstance(recipients_list, str): + recipients_list = [recipients_list] + with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server: + server.starttls() # Enable security + server.login(SMTP_USER, SMTP_PASSWORD) # Login with mail_id and password + text = msg.as_string() + server.sendmail(SENDER_EMAIL, recipients_list, text) + + logging.info("Email sent successfully.") + except Exception as e: + logging.error(f"Failed to send email: {e}") + +# Create the email message +def send_error_email(subject, body, recipients): + try: + sender_email = os.getenv('SENDER_EMAIL') + smtp_server = os.getenv('SMTP_SERVER') + smtp_port = int(os.getenv('SMTP_PORT')) + smtp_user = os.getenv('SMTP_USER') + smtp_password = os.getenv('SMTP_PASSWORD') + + msg = MIMEMultipart() + msg['From'] = sender_email + msg['To'] = ", ".join(recipients) + msg['Subject'] = subject + msg.attach(MIMEText(body, 'plain')) + + with smtplib.SMTP(smtp_server, smtp_port) as server: + server.starttls() + server.login(smtp_user, smtp_password) + server.sendmail(sender_email, recipients, msg.as_string()) + + logging.error("Error email sent successfully") + except Exception as e: + logging.error(f"Failed to send error email: {e}") + +# Handle error +def handle_error(e): + error_trace = traceback.format_exc() + subject = "Error Notification" + body = f"An error occurred:\n\n{error_trace}" + recipients = os.getenv('EMAIL_RECIPIENTS', '').split(',') + send_error_email(subject, body, recipients) + raise e diff --git a/error_handler.py b/error_handler.py new file mode 100644 index 0000000..1b75b96 --- /dev/null +++ b/error_handler.py @@ -0,0 +1,22 @@ +# error_handler.py + +import logging + +def handle_general_error(e): + logging.error(f'An error occurred during the process: {e}') + # Add any additional error handling logic here + +def handle_file_not_found_error(e): + logging.error(f"File not found error: {e}") + # Add any additional error handling logic here + +def handle_value_error(e): + logging.error(f"Value error: {e}") + # Add any additional error handling logic here + +def handle_error(error_message): + logging.error(f"Error: {error_message}") + +class ClientError(Exception): + """Custom exception class for client errors.""" + pass \ No newline at end of file diff --git a/file_utils.py b/file_utils.py new file mode 100644 index 0000000..a62b90b --- /dev/null +++ b/file_utils.py @@ -0,0 +1,349 @@ +import os +import logging +from logging.handlers import RotatingFileHandler +import json + +from utils import check_video_info, check_audio_info + +from error_handler import handle_error +from botocore.exceptions import ClientError + +#from config import load_config, aws_config, bucket_name +import config + +def retrieve_file_contents(s3, base_name): + file_contents = {} + + # Retrieve the configuration values + # aws_config, db_config, ach_config, bucket_name, ach_variables = config.load_config() + _, _, _, bucket_name, _ = config.load_config() + + try: + # Define the file extensions as pairs + file_extensions = [['json', 'json'], ['md5', 'md5']] + + for ext_pair in file_extensions: + file_name = f"{base_name}.{ext_pair[0]}" + + try: + response = s3.get_object(Bucket=bucket_name, Key=file_name) + file_contents[ext_pair[1]] = response['Body'].read().decode('utf-8') + logging.info(f"Retrieved {ext_pair[1]} file content for base_name {base_name}.") + except ClientError as e: + # S3 returns a NoSuchKey error code when the key is missing. + code = e.response.get('Error', {}).get('Code', '') + if code in ('NoSuchKey', '404', 'NotFound'): + logging.warning(f"{file_name} not found in S3 (code={code}).") + # treat missing sidecars as non-fatal; continue + continue + else: + logging.error(f"Error retrieving {file_name}: {e}", exc_info=True) + # Re-raise other ClientError types + raise + except Exception as e: + logging.error(f'Error retrieving file contents for {base_name}: {e}', exc_info=True) + # Return empty JSON structure instead of raising to avoid tracebacks in callers + try: + return json.dumps({}) + except Exception: + return '{}' + + # Clean and format file_contents as proper JSON + try: + cleaned_contents = {} + + # Clean the contents + for key, value in file_contents.items(): + if isinstance(value, str): + # Remove trailing newlines or any other unwanted characters + cleaned_value = value.strip() + + # Attempt to parse JSON + try: + cleaned_contents[key] = json.loads(cleaned_value) + except json.JSONDecodeError: + cleaned_contents[key] = cleaned_value + else: + cleaned_contents[key] = value + + # Return the cleaned and formatted JSON + return json.dumps(cleaned_contents, indent=4) + except (TypeError, ValueError) as e: + logging.error(f'Error formatting file contents as JSON: {e}', exc_info=True) + raise e + +def check_related_files(s3, file_name_with_path, file, bucket_name): + """ + Check for related files in S3 based on the given file type. + Parameters: + - s3: The S3 client object. + - file_name_with_path: The name of the file with its path. + - file: The file name. + - bucket_name: The name of the S3 bucket. + Returns: + None + Raises: + - FileNotFoundError: If a required file is not found in S3. + - ValueError: If a file has zero size. + - Exception: If an unexpected exception occurs. + """ + from s3_utils import check_file_exists_in_s3, get_file_size # avoid circular import + + import config + + # Load the configuration from the .env file + # aws_config, db_config, ach_config, bucket_name, ach_variables = config.load_config() + _, _, _, bucket_name, _ = config.load_config() + + ach_pdf_disk_size = 0 + + # Set required extensions based on the file type + if file.endswith('.mp4'): + required_extensions = ['json', 'md5', 'pdf'] + elif file.endswith('.mp3'): + required_extensions = ['json', 'md5'] + else: + required_extensions = [] + + logging.info(f"Required extensions: {required_extensions}") + for ext in required_extensions: + related_file = f"{file_name_with_path}.{ext}" + logging.info(f"Checking for related file: {related_file}") + + try: + if not check_file_exists_in_s3(s3, related_file,bucket_name): + error_message = f"Required file {related_file} not found in S3." + logging.error(error_message) + raise FileNotFoundError(error_message) + else: + logging.info(f"Found related file: {related_file}") + + except FileNotFoundError as e: + logging.error(f"Caught a FileNotFoundError: {e}") + + except Exception as e: + logging.error(f"Caught an unexpected exception: {e}") + + # Check the size of the related file + try: + if ext in ['json', 'md5', 'pdf']: + file_size = get_file_size(s3, bucket_name, related_file) + if file_size == 0: + error_message = f"File {related_file} has zero size." + logging.error(error_message) + raise ValueError(error_message) + else: + logging.info(f"File {related_file} size: {file_size}") + except ValueError as e: + logging.error(f"Caught a ValueError file Size is zero: {e}") + raise ValueError(f"File {related_file} has zero size.") + except Exception as e: + logging.error(f"Caught an unexpected exception: {e}") + + # If the required file is a .pdf, get its size and update ach_pdf_disk_size + if ext =='pdf': + pdf_file = f"{file_name_with_path}.pdf" + if check_file_exists_in_s3(s3, pdf_file,bucket_name): + pdf_file_size = get_file_size(s3, bucket_name, pdf_file) + ach_pdf_disk_size = pdf_file_size + logging.info(f"PDF disk size: {ach_pdf_disk_size}") + else: + logging.error(f"PDF file {pdf_file} not found.") + raise FileNotFoundError(f"PDF file {pdf_file} not found.") + + return ach_pdf_disk_size + +def extract_and_validate_file_info(file_contents, file, ach_variables): + + # Load the configuration from the .env file + #aws_config, db_config, ach_config, bucket_name, _ = config.load_config() + + # Extract relevant information from nested JSON + ach_custom_data_in = file_contents + + # Check if json contain mediainfo metadata or ffprobe metadata or both + logging.info(f"Extracted JSON contents: {ach_custom_data_in['json']}") + # Check for keys at the first level + if 'mediainfo' in ach_custom_data_in['json'] and 'ffprobe' in ach_custom_data_in['json']: + ach_variables['custom_data_in'] = { + "mediainfo": ach_custom_data_in['json'].get('mediainfo', {}), + "ffprobe": ach_custom_data_in['json'].get('ffprobe', {}), + "filename": ach_custom_data_in['json'].get('filename', ''), + "md5": ach_custom_data_in.get('md5', '') + } + logging.info("mediainfo and ffprobe metadata found in JSON file.") + # Check for keys at the second level if it is not already ordered + elif 'creatingLibrary' in ach_custom_data_in['json'] and ach_custom_data_in['json'].get('creatingLibrary','').get('name','') == 'MediaInfoLib': + ach_variables['custom_data_in'] = { + "mediainfo": ach_custom_data_in['json'], + "md5": ach_custom_data_in.get('md5', '') + } + logging.info("mediainfo metadata found in JSON file.") + elif 'streams' in ach_custom_data_in['json']: + ach_variables['custom_data_in'] = { + "ffprobe": ach_custom_data_in['json'], + "md5": ach_custom_data_in.get('md5', '') + } + logging.info("ffprobe metadata found in JSON file.") + else: + ach_variables['custom_data_in'] = { + "md5": ach_custom_data_in.get('md5', '') + } + logging.error(f"No recognized data found in JSON file.{ach_custom_data_in} - {file_contents}") + # trhow an error + raise ValueError("No recognized data found in JSON file.") + + logging.info(f"Extracted JSON contents: {ach_variables['custom_data_in']}") + # Extract FileExtension and FileSize if "@type" is "General" + ach_disk_size = None + tracks = ach_variables['custom_data_in'].get('mediainfo', {}).get('media', {}).get('track', []) + + for track in tracks: + # Check if @type is "General" + if track.get('@type') == 'General': + + # Retrieve the disk size from the General track + ach_disk_size = track.get('FileSize', None) + logging.info(f"Disk size from JSON media.track.General: {ach_disk_size}") + + # Retrieve the file extension from the General track + ach_conservative_copy_extension = '.' + track.get('FileExtension', None) + logging.info(f"FileExtension JSON media.track.General: {ach_conservative_copy_extension}") + + # Exit loop after finding the General track + break # Exit the loop after finding the General track + + # Convert ach_disk_size to an integer if found + if ach_disk_size is not None: + ach_disk_size = int(ach_disk_size) + + # MEDIAINFO + if "mediainfo" in ach_variables['custom_data_in'] and "media" in ach_variables['custom_data_in'].get("mediainfo"): + # Extract the media_ref field from the JSON file contents + media_ref = ach_variables['custom_data_in'].get('mediainfo', {}).get("media", {}).get("@ref", "") + + #STRIP DOUBLE BACK SLASKS FROM PATH + media_ref = media_ref.replace("\\", "/") + logging.info(f"Media ref medianfo: {media_ref}") + # Split the path using '/' and get the last part (file name) + file_name = media_ref.split('/')[-2] + '/' + media_ref.split('/')[-1] + logging.info(f"Media file name (copia conservativa): {file_name}") + + # Update the @ref field with the new file name + ach_variables['custom_data_in']["mediainfo"]["media"]["@ref"] = file_name + logging.info(f"Updated the truncated file_name at mediainfo.media.@ref {ach_variables['custom_data_in']['mediainfo']['media']['@ref']}") + else: + logging.warning(f"mediainfo.media.@ref not found in JSON file.") + + # FFPROBE + if "ffprobe" in ach_variables['custom_data_in'] and "format" in ach_variables['custom_data_in'].get("ffprobe"): + # Extract the media_ref field from the JSON file contents + media_ref = ach_variables['custom_data_in'].get('ffprobe', {}).get("format", {}).get("filename", "") + + #STRIP DOUBLE BACK SLASKS FROM PATH + media_ref = media_ref.replace("\\", "/") + logging.info(f"Media ref medianfo: {media_ref}") + # Split the path using '/' and get the last part (file name) + file_name = media_ref.split('/')[-2] + '/' + media_ref.split('/')[-1] + logging.info(f"Media file name (copia conservativa): {file_name}") + # Update the @ref field with the new file name + ach_variables['custom_data_in']["ffprobe"]["format"]["filename"] = file_name + logging.info(f"Updated the truncated file_name at ffprobe.format.filename {ach_variables['custom_data_in']['mediainfo']['media']['@ref']}") + else: + logging.warning(f"ffprobe.format.filename not found in JSON file.") + + logging.info(f"Updated the truncated file_name at mediainfo.media.@ref {file_name}") + logging.info(f"JSON contents: {ach_variables['custom_data_in']}") + + # Check if file_contents is a string + if isinstance(ach_variables['custom_data_in'], str): + # Parse the JSON string into a dictionary + ach_custom_data_in = json.loads(ach_variables['custom_data_in']) + else: + # Assume file_contents is already a dictionary + ach_custom_data_in = ach_variables['custom_data_in'] + + # Check if basename is equal to name in the json file + json_ref_mediainfo_path = ach_custom_data_in.get('mediainfo', {}).get("media", {}).get("@ref", "") + json_ref_ffprobe_path = ach_custom_data_in.get('ffprobe', {}).get("format", {}).get("filename", "") + logging.info(f"JSON file names: mediainfo: '{json_ref_mediainfo_path}', ffprobe: '{json_ref_ffprobe_path}', ach_file_fullpath: '{ach_variables['file_fullpath']}'") + + # Extract base names + basename_fullpath = os.path.splitext(os.path.basename(ach_variables['file_fullpath']))[0] + basename_fullpath = basename_fullpath.replace('_H264', '') + basename_mediainfo = os.path.splitext(os.path.basename(json_ref_mediainfo_path))[0] + basename_ffprobe = os.path.splitext(os.path.basename(json_ref_ffprobe_path))[0] + + # Check if the basenames are equal + if basename_fullpath != basename_mediainfo: + logging.warning(f"ach_file_fullpath '{basename_fullpath}' does not match JSON mediainfo file name '{basename_mediainfo}'.") + else: + logging.info(f"ach_file_fullpath '{basename_fullpath}' matches JSON mediainfo file name '{basename_mediainfo}'.") + + # Check if the basename matches the ffprobe path + if basename_fullpath != basename_ffprobe: + logging.warning(f"ach_file_fullpath '{basename_fullpath}' does not match JSON ffprobe file name '{basename_ffprobe}'.") + else: + logging.info(f"ach_file_fullpath '{basename_fullpath}' matches JSON ffprobe file name '{basename_ffprobe}'.") + + if basename_fullpath != basename_mediainfo and basename_fullpath != basename_ffprobe: + logging.error(f"ach_file_fullpath '{basename_fullpath}' does not match either JSON file name '{basename_mediainfo}' or '{basename_ffprobe}'.") + raise ValueError(f"ach_file_fullpath '{basename_fullpath}' does not match either JSON file name '{basename_mediainfo}' or '{basename_ffprobe}'.") + + # Check if the file is a video or audio file + try: + if file.endswith('.mp4'): + result, message = check_video_info(ach_custom_data_in.get('mediainfo', {})) + logging.info(f"Validation result for {file}: {message}") + elif file.endswith('.mp3'): + result, message = check_audio_info(ach_custom_data_in.get('mediainfo', {})) + logging.info(f"Validation result for {file}: {message}") + else: + # Handle cases where the file type is not supported + raise ValueError(f"Unsupported file type: {file}") + + # Handle the error if validation fails + if not result: + error_message = f"Validation failed for {file}: {message}" + logging.error(error_message) + # handle_error(ValueError(error_message)) # Create and handle the exception + except ValueError as e: + # Handle specific ValueError exceptions + logging.error(f"Caught a ValueError: {e}") + #handle_error(e) # Pass the ValueError to handle_error + + except Exception as e: + # Handle any other unexpected exceptions + logging.error(f"Caught an unexpected exception: {e}") + #handle_error(e) # Pass unexpected exceptions to handle_error + + + # Return the updated ach_custom_data_in dictionary + ach_custom_data_in.pop('filename', None) # Remove 'filename' key if it exists + # logging.info(f"ach_custom_data_in: {json.dumps(ach_custom_data_in, indent=4)}") + return ach_custom_data_in, ach_disk_size, ach_conservative_copy_extension + +def is_file_empty(file_path): + return os.path.exists(file_path) and os.path.getsize(file_path) == 0 + +# unused function + +def read_file(file_path): + try: + with open(file_path, 'r') as file: + return file.read() + except FileNotFoundError as e: + logging.error(f"File not found: {e}") + raise e + except IOError as e: + logging.error(f"IO error: {e}") + raise e + +def write_file(file_path, content): + try: + with open(file_path, 'w') as file: + file.write(content) + except IOError as e: + logging.error(f"IO error: {e}") + raise e + \ No newline at end of file diff --git a/logging_config.py b/logging_config.py new file mode 100644 index 0000000..4a9b10c --- /dev/null +++ b/logging_config.py @@ -0,0 +1,82 @@ +import logging +from logging.handlers import TimedRotatingFileHandler +import os +from pathlib import Path +from dotenv import load_dotenv + +load_dotenv() + +# Custom log level (optional). Keep for backward compatibility. +CUSTOM_ERROR_LEVEL = 35 +logging.addLevelName(CUSTOM_ERROR_LEVEL, "CUSTOM_ERROR") + + +def custom_error(self, message, *args, **kwargs): + """Helper to log at the custom error level.""" + if self.isEnabledFor(CUSTOM_ERROR_LEVEL): + self._log(CUSTOM_ERROR_LEVEL, message, args, **kwargs) + +# Attach helper to the Logger class so callers can do: logging.getLogger().custom_error(...) +logging.Logger.custom_error = custom_error + + +def _ensure_dir_for_file(path: str): + """Ensure the parent directory for `path` exists.""" + Path(path).resolve().parent.mkdir(parents=True, exist_ok=True) + + +def _create_timed_handler(path: str, level=None, when='midnight', interval=1, backupCount=7, fmt=None): + """ + Create and configure a TimedRotatingFileHandler. + Uses the handler's built-in rotation logic which is more robust and easier + to maintain than a custom doRollover implementation. + """ + _ensure_dir_for_file(path) + handler = TimedRotatingFileHandler(path, when=when, interval=interval, backupCount=backupCount, encoding='utf-8') + # Use a readable suffix for rotated files (handler will append this after the filename) + handler.suffix = "%Y%m%d_%H%M%S" + if fmt: + handler.setFormatter(fmt) + if level is not None: + handler.setLevel(level) + return handler + + +def setup_logging(): + """ + Configure logging for the application and return (logger, info_handler, error_handler, warning_handler). + This version uses standard TimedRotatingFileHandler to keep the logic simple and + avoid fragile file-renaming on Windows. + """ + # Select a format depending on environment for easier debugging in dev + if os.getenv('ACH_ENV') == 'development': + log_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s - %(pathname)s:%(lineno)d') + elif os.getenv('ACH_ENV') == 'production': + log_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + else: + log_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s') + + error_log_path = os.getenv('ERROR_LOG_FILE_PATH', "./logs/ACH_media_import_errors.log") + warning_log_path = os.getenv('WARNING_LOG_FILE_PATH', "./logs/ACH_media_import_warnings.log") + info_log_path = os.getenv('INFO_LOG_FILE_PATH', "./logs/ACH_media_import_info.log") + + # Create three handlers: info (all), warning (warning+), error (error+) + info_handler = _create_timed_handler(info_log_path, level=logging.INFO, fmt=log_formatter, backupCount=int(os.getenv('LOG_BACKUP_COUNT', '7'))) + warning_handler = _create_timed_handler(warning_log_path, level=logging.WARNING, fmt=log_formatter, backupCount=int(os.getenv('LOG_BACKUP_COUNT', '7'))) + error_handler = _create_timed_handler(error_log_path, level=logging.ERROR, fmt=log_formatter, backupCount=int(os.getenv('LOG_BACKUP_COUNT', '7'))) + + console_handler = logging.StreamHandler() + console_handler.setFormatter(log_formatter) + + # Configure root logger explicitly + root_logger = logging.getLogger() + root_logger.setLevel(logging.INFO) + # Clear existing handlers to avoid duplicate logs when unit tests or reloads occur + root_logger.handlers = [] + root_logger.addHandler(info_handler) + root_logger.addHandler(warning_handler) + root_logger.addHandler(error_handler) + root_logger.addHandler(console_handler) + + # Return the root logger and handlers (so callers can do manual rollovers if they truly need to) + return root_logger, info_handler, error_handler, warning_handler \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..466c081 --- /dev/null +++ b/main.py @@ -0,0 +1,503 @@ +# v20251103 - Main script to import media files from S3 to the database +import logging +import time +from datetime import datetime +import pytz +import os +import re +from logging_config import setup_logging, CUSTOM_ERROR_LEVEL +from email_utils import handle_error, send_email_with_attachment +from s3_utils import create_s3_client, list_s3_bucket, parse_s3_files +from error_handler import handle_general_error, handle_file_not_found_error, handle_value_error +from file_utils import is_file_empty +from db_utils import count_files, get_distinct_filenames_from_db +from dotenv import load_dotenv +import config +import psycopg2 + +load_dotenv() + +import re +import logging +import os + +def analyze_pattern_match(text, description): + """Analyze which part of the 12-char pattern is not matching. + + The code currently truncates base/folder names to the first 12 characters and + uses the pattern r'^[VA][OC]-[A-Z0-9]{3}-\d{5}$' which is 12 characters long. + This function therefore validates a 12-character string and avoids + indexing beyond its length. + """ + if not text: + return [f"{description}: Empty or None text"] + + issues = [] + expected_length = 12 # Pattern: [VA][OC]-[3chars]-[5digits] + + # Check length + if len(text) != expected_length: + issues.append(f"Length mismatch: expected {expected_length}, got {len(text)}") + return issues + + # Step 1: Check 1st character - V or A + if text[0] not in ['V', 'A']: + issues.append(f"Position 1: Expected [V,A], got '{text[0]}'") + + # Step 2: Check 2nd character - O or C + if text[1] not in ['O', 'C']: + issues.append(f"Position 2: Expected [O,C], got '{text[1]}'") + + # Step 3: Check 3rd character - dash + if text[2] != '-': + issues.append(f"Position 3: Expected '-', got '{text[2]}'") + + # Step 4: Check positions 4,5,6 - [A-Z0-9] + for i in range(3, 6): + if not re.match(r'^[A-Z0-9]$', text[i]): + issues.append(f"Position {i+1}: Expected [A-Z0-9], got '{text[i]}'") + + # Step 5: Check 7th character - dash + if text[6] != '-': + issues.append(f"Position 7: Expected '-', got '{text[6]}'") + + # Step 6: Check positions 8-12 - digits + for i in range(7, 12): + if not text[i].isdigit(): + issues.append(f"Position {i+1}: Expected digit, got '{text[i]}'") + + return issues + +# MAIN PROCESS +def main_process(aws_config, db_config, ach_config, bucket_name, ach_variables): + # import global variables + #from config import load_config, aws_config, db_config, ach_config, bucket_name + #global aws_config, db_config, ach_config, bucket_name + #config import load_config , aws_config, db_config, ach_config, bucket_name + #load_config() + + logging.info(f"bucket_name: {bucket_name}") + + # Ensure timing variables are always defined so later error-email logic + # won't fail if an exception is raised before end_time/elapsed_time is set. + start_time = time.time() + # IN HUMAN READABLE FORMAT + logging.info(f"Process started at {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))}") + end_time = start_time + elapsed_time = 0.0 + + try: + logging.info("Starting the main process...") + + # Helper to make spaces visible in filenames for logging (replace ' ' with open-box char) + def _visible_spaces(name: str) -> str: + try: + return name.replace(' ', '\u2423') + except Exception: + return name + + # Create the S3 client + s3_client = create_s3_client(aws_config) + + # List S3 bucket contents + list_s3_files = list_s3_bucket(s3_client, bucket_name) + + # Define valid extensions and excluded folders + valid_extensions = {'.mp3', '.mp4', '.md5', '.json', '.pdf'} + # excluded_folders = {'DOCUMENTAZIONE_FOTOGRAFICA/', 'TEST-FOLDER-DEV/', 'FILE/'} + excluded_folders = {'DOCUMENTAZIONE_FOTOGRAFICA/', 'TEST-FOLDER-DEV/'} + # included_folders = {'FILE/'} # uncomment this to NOT use excluded folders + # included_folders = {'TEST-FOLDER-DEV/'} # uncomment this to NOT use excluded folders + + # Extract and filter file names + + # s3_file_names: include only files that match valid extensions and + # (if configured) whose key starts with one of the included_folders. + # We still skip any explicitly excluded_folders. Guard against the + # case where `included_folders` isn't defined to avoid NameError. + try: + use_included = bool(included_folders) + except NameError: + use_included = False + + if use_included: + s3_file_names = [ + content['Key'] for content in list_s3_files + if any(content['Key'].endswith(ext) for ext in valid_extensions) + and any(content['Key'].startswith(folder) for folder in included_folders) + ] + logging.info(f"Using included_folders filter: {included_folders}") + else: + s3_file_names = [ + content['Key'] for content in list_s3_files + if any(content['Key'].endswith(ext) for ext in valid_extensions) + and not any(content['Key'].startswith(folder) for folder in excluded_folders) + ] + logging.info("Using excluded_folders filter") + + # check inventory code syntax + # first check s3_file_names if the file base name and folder name match pattern = r'^[VA][OC]-[A-Z0-9]{3}-\d{5}_\d{2}$' + pattern = r'^[VA][OC]-[A-Z0-9]{3}-\d{5}$' + contents = [] + + for s3file in s3_file_names: + # s3_file_names contains the object keys (strings), not dicts. + base_name = os.path.basename(s3file) + # keep only first 12 chars + base_name = base_name[:12] + logging.info(f"Base name: {base_name}") + folder_name = os.path.dirname(s3file) + # keep only first 12 chars of folder name as well + # folder_name = folder_name[:12] + # logging.info(f"Folder name: {folder_name}") + + if re.match(pattern, base_name): # and re.match(pattern, folder_name): + logging.info(f"File {base_name} matches pattern.") + contents.append(s3file) + else: + # Check base name + if not re.match(pattern, base_name): + base_issues = analyze_pattern_match(base_name, "Base name") + logging.warning(f"Base name '{base_name}' does not match pattern. Issues: {base_issues}") + + logging.warning(f"File {base_name} in folder {folder_name} does not match pattern.") + + + # filter_s3_files_not_in_db + # --- Get all DB filenames in one call --- + db_file_names = get_distinct_filenames_from_db() + + # --- Keep only those not in DB --- + # Additionally, if the DB already contains a sidecar record for the + # same basename (for extensions .md5, .json, .pdf), skip the S3 file + # since the asset is already represented in the DB via those sidecars. + sidecar_exts = ('.md5', '.json', '.pdf') + db_sidecar_basenames = set() + for dbf in db_file_names: + for ext in sidecar_exts: + if dbf.endswith(ext): + db_sidecar_basenames.add(dbf[:-len(ext)]) + break + + file_names = [] + for f in contents: + # exact key present in DB -> skip + if f in db_file_names: + continue + # strip extension to get basename and skip if DB has sidecar for it + base = os.path.splitext(f)[0] + if base in db_sidecar_basenames: + # logging.info("Skipping %s because DB already contains sidecar for basename %s", _visible_spaces(f), _visible_spaces(base)) + continue + file_names.append(f) + + # Print the total number of files + total_files_s3 = len(contents) + logging.info(f"Total number of the valid (mp3,mp4,md5,json,pdf) files in the S3 bucket before DB filter: {total_files_s3}") + total_files = len(file_names) + logging.info(f"Total number of the valid (mp3,mp4,md5,json,pdf) files after DB filter: {total_files}") + + # Count files with .mp4 and .mp3 extensions + mp4_count = sum(1 for file in s3_file_names if file.endswith('.mp4')) + mp3_count = sum(1 for file in s3_file_names if file.endswith('.mp3')) + md5_count = sum(1 for file in s3_file_names if file.endswith('.md5')) + pdf_count = sum(1 for file in s3_file_names if file.endswith('.pdf')) + json_count = sum(1 for file in s3_file_names if file.endswith('.json')) + mov_count = sum(1 for file in s3_file_names if file.endswith('.mov')) + # jpg_count = sum(1 for file in file_names if file.endswith('.jpg')) + + # file directory + avi_count = sum(1 for file in s3_file_names if file.endswith('.avi')) + m4v_count = sum(1 for file in s3_file_names if file.endswith('.m4v')) + # Log the counts + # Get the logger instance + logger = logging.getLogger() + + # Use the logger instance to log custom info + logging.info("Number of .mp4 files on S3 bucket (%s): %s", bucket_name, mp4_count) + logging.info("Number of .mp3 files on S3 bucket (%s): %s", bucket_name, mp3_count) + logging.info("Number of .md5 files on S3 bucket (%s): %s", bucket_name, md5_count) + logging.info("Number of .pdf files on S3 bucket (%s): %s", bucket_name, pdf_count) + logging.info("Number of .json files on S3 bucket (%s): %s", bucket_name, json_count) + logging.info("Number of .mov files on S3 bucket (%s): %s", bucket_name, mov_count) + # logging.info(f"Number of .jpg files: {jpg_count}") + + # If ACH_SAFE_RUN is 'false' we enforce strict mp4/pdf parity and abort + # when mismatched. Default is 'true' which skips this abort to allow + # safer runs during testing or manual reconciliation. + if os.getenv('ACH_SAFE_RUN', 'true') == 'true': + if mp4_count != pdf_count: + logging.error("Number of .mp4 files is not equal to number of .pdf files") + # MOD 20251103 + # add a check to find the missing pdf or mp4 files and report them + # use file_names to find missing files + # store tuples (source_file, expected_counterpart) for clearer logging + missing_pdfs = [] # list of (mp4_file, expected_pdf) + missing_mp4s = [] # list of (pdf_file, expected_mp4) + for file in file_names: + if file.endswith('.mp4'): + # remove extension + base_name = file[:-4] # keeps any path prefix + # if the mp4 is an H264 variant (e.g. name_H264.mp4) remove the suffix + if base_name.endswith('_H264'): + base_name = base_name[:-5] + expected_pdf = base_name + '.pdf' + if expected_pdf not in file_names: + missing_pdfs.append((file, expected_pdf)) + elif file.endswith('.pdf'): + # Normalize base name and accept either the regular mp4 or the _H264 variant. + base_name = file[:-4] + expected_mp4 = base_name + '.mp4' + h264_variant = base_name + '_H264.mp4' + # If neither the regular mp4 nor the H264 variant exists, report missing. + if expected_mp4 not in file_names and h264_variant not in file_names: + missing_mp4s.append((file, expected_mp4)) + # report missing files + if missing_pdfs: + logging.error("Missing .pdf files (mp4 -> expected pdf):") + for mp4_file, expected_pdf in missing_pdfs: + logging.error("%s -> %s", _visible_spaces(mp4_file), _visible_spaces(expected_pdf)) + + if missing_mp4s: + logging.error("Missing .mp4 files (pdf -> expected mp4):") + for pdf_file, expected_mp4 in missing_mp4s: + logging.error("%s -> %s", _visible_spaces(pdf_file), _visible_spaces(expected_mp4)) + + logging.error("Abort Import Process due to missing files") + raise ValueError("Inconsistent file counts mp4 vs pdf") + + if mp3_count + mp4_count != json_count: + logging.error("Number of .mp3 files + number of .mp4 files is not equal to number of .json files") + logging.error("Abort Import Process due to missing files") + # search wich file dont match TODO + raise ValueError("Inconsistent file counts mp3+mp4 vs json") + + if mp3_count + mp4_count != md5_count: + logging.error("Number of .mp3 files + number of .mp4 files is not equal to number of .md5 files") + logging.error("Abort Import Process due to missing files") + # search wich file dont match TODO + raise ValueError("Inconsistent file counts mp3+mp4 vs md5") + + # Try to parse S3 files + try: + # If DRY RUN is set to True, the files will not be uploaded to the database + if os.getenv('ACH_DRY_RUN', 'true') == 'false': + uploaded_files_count, warning_files_count, error_files_count = parse_s3_files(s3_client, file_names, ach_variables, excluded_folders) + else: + logging.warning("DRY RUN is set to TRUE - No files will be added to the database") + # set the tuples to zero + uploaded_files_count, warning_files_count, error_files_count = (0, 0, 0) + logging.info("Total number of files (mp3+mp4) with warnings: %s. (Probably already existing in the DB)", warning_files_count) + logging.info("Total number of files with errors: %s", error_files_count) + logging.info("Total number of files uploaded: %s", uploaded_files_count) + logging.info("All files parsed") + except Exception as e: + logging.error(f"An error occurred while parsing S3 files: {e}") + handle_general_error(e) + + # Check results + # connect to database + conn = psycopg2.connect(**db_config) + cur = conn.cursor() + # function count_files that are wav and mov in db + # Map file extensions (include leading dot) to mime types + EXTENSION_MIME_MAP = { + '.avi': 'video/x-msvideo', + '.mov': 'video/mov', + '.wav': 'audio/wav', + '.mp4': 'video/mp4', + '.m4v': 'video/mp4', + '.mp3': 'audio/mp3', + '.mxf': 'application/mxf', + '.mpg': 'video/mpeg', + } + + # populate mime_type list with all relevant MediaInfo/MIME values + mime_type = [ + 'video/x-msvideo', # .avi + 'video/mov', # .mov + 'audio/wav', # .wav + 'video/mp4', # .mp4, .m4v + 'audio/mp3', # .mp3 + 'application/mxf', # .mxf + 'video/mpeg', # .mpg + ] + + logging.info(f"Mime types for counting files: {mime_type}") + + all_files_on_db = count_files(cur, mime_type,'*', False) + mov_files_on_db = count_files(cur,['video/mov'],'.mov', False ) + mxf_files_on_db = count_files(cur,['application/mxf'],'.mxf', False ) + mpg_files_on_db = count_files(cur,['video/mpeg'],'.mpg', False ) + avi_files_on_db = count_files(cur,['video/x-msvideo'],'.avi', False ) + m4v_files_on_db = count_files(cur,['video/mp4'],'.m4v', False ) + mp4_files_on_db = count_files(cur,['video/mp4'],'.mp4', False ) + wav_files_on_db = count_files(cur,['audio/wav'],'.wav', False ) + mp3_files_on_db = count_files(cur,['audio/mp3'],'.mp3', False ) + + # mov + m4v + avi + mxf + mpg + logging.info(f"Number of all video files in the database: {all_files_on_db}") + logging.info(f"Number of .mov files in the database: {mov_files_on_db} and S3: {mov_count} ") + logging.info(f"Number of .mp4 files in the database: {mp4_files_on_db} and S3: {mp4_count}") + + # compare the mp4 name and s3 name and report the missing files in the 2 lists a print the list + missing_mp4s = [f for f in file_names if f.endswith('.mp4') and f not in db_file_names] + # if missing_mp4s empty do not return a warning + if missing_mp4s: + logging.warning(f"Missing .mp4 files in DB compared to S3: {missing_mp4s}") + + logging.info(f"Number of .wav files in the database: {wav_files_on_db} ") + logging.info(f"Number of .mp3 files in the database: {mp3_files_on_db} and S3: {mp3_count}") + + missing_mp3s = [f for f in file_names if f.endswith('.mp3') and f not in db_file_names] + # if missing_mp3s empty do not return a warning + if missing_mp3s: + logging.warning(f"Missing .mp3 files in DB compared to S3: {missing_mp3s}") + + logging.info(f"Number of .avi files in the database: {avi_files_on_db} ") + logging.info(f"Number of .m4v files in the database: {m4v_files_on_db} ") + logging.info(f"Number of .mxf files in the database: {mxf_files_on_db} ") + logging.info(f"Number of .mpg files in the database: {mpg_files_on_db} ") + + logging.info(f"Total file in s3 before import {total_files}") + + # time elapsed + end_time = time.time() # Record end time + + elapsed_time = end_time - start_time + logging.info(f"Processing completed. Time taken: {elapsed_time:.2f} seconds") + + except FileNotFoundError as e: + handle_file_not_found_error(e) + except ValueError as e: + handle_value_error(e) + except Exception as e: + handle_general_error(e) + + # Send Email with logs if success or failure + # Define the CET timezone + cet = pytz.timezone('CET') + + # Helper to rename a log file by appending a timestamp and return the new path. + def _rename_log_if_nonempty(path): + try: + if not path or not os.path.exists(path): + return None + # If file is empty, don't attach/rename it + if os.path.getsize(path) == 0: + return None + dir_name = os.path.dirname(path) + base_name = os.path.splitext(os.path.basename(path))[0] + timestamp = datetime.now(cet).strftime("%Y%m%d_%H%M%S") + new_log_path = os.path.join(dir_name, f"{base_name}_{timestamp}.log") + # Attempt to move/replace atomically where possible + try: + os.replace(path, new_log_path) + except Exception: + # Fallback to rename (may raise on Windows if target exists) + os.rename(path, new_log_path) + return new_log_path + except Exception as e: + logging.error("Failed to rename log %s: %s", path, e) + return None + + # close logging to flush handlers before moving files + logging.shutdown() + + logging.info("Preparing summary email") + + error_log = './logs/ACH_media_import_errors.log' + warning_log = './logs/ACH_media_import_warning.log' + + # Determine presence of errors/warnings + has_errors = False + has_warnings = False + try: + if os.path.exists(error_log) and os.path.getsize(error_log) > 0: + with open(error_log, 'r', encoding='utf-8', errors='ignore') as f: + content = f.read() + if 'ERROR' in content or len(content.strip()) > 0: + has_errors = True + if os.path.exists(warning_log) and os.path.getsize(warning_log) > 0: + with open(warning_log, 'r', encoding='utf-8', errors='ignore') as f: + content = f.read() + if 'WARNING' in content or len(content.strip()) > 0: + has_warnings = True + except Exception as e: + logging.error("Error while reading log files: %s", e) + + # from env - split safely and strip whitespace + def _split_env_list(name): + raw = os.getenv(name, '') + return [s.strip() for s in raw.split(',') if s.strip()] + + EMAIL_RECIPIENTS = _split_env_list('EMAIL_RECIPIENTS') + ERROR_EMAIL_RECIPIENTS = _split_env_list('ERROR_EMAIL_RECIPIENTS') or EMAIL_RECIPIENTS + SUCCESS_EMAIL_RECIPIENTS = _split_env_list('SUCCESS_EMAIL_RECIPIENTS') or EMAIL_RECIPIENTS + + # Choose subject and attachment based on severity + if has_errors: + subject = "ARKIVO Import of Video/Audio Ran with Errors" + attachment_to_send = _rename_log_if_nonempty(error_log) or error_log + body = "Please find the attached error log file. Job started at %s and ended at %s, taking %.2f seconds." % ( + datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S'), + datetime.fromtimestamp(end_time).strftime('%Y-%m-%d %H:%M:%S'), + elapsed_time + ) + email_recipients=ERROR_EMAIL_RECIPIENTS + elif has_warnings: + subject = "ARKIVO Import of Video/Audio Completed with Warnings" + # Attach the warnings log for investigation + attachment_to_send = _rename_log_if_nonempty(warning_log) or warning_log + body = "The import completed with warnings. Please find the attached warning log. Job started at %s and ended at %s, taking %.2f seconds." % ( + datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S'), + datetime.fromtimestamp(end_time).strftime('%Y-%m-%d %H:%M:%S'), + elapsed_time + ) + email_recipients=ERROR_EMAIL_RECIPIENTS + else: + subject = "ARKIVO Video/Audio Import Completed Successfully" + # No attachment for clean success + attachment_to_send = None + body = "The import of media (video/audio) completed successfully without any errors or warnings. Job started at %s and ended at %s, taking %.2f seconds." % ( + datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S'), + datetime.fromtimestamp(end_time).strftime('%Y-%m-%d %H:%M:%S'), + elapsed_time + ) + email_recipients=SUCCESS_EMAIL_RECIPIENTS + + logging.info("Sending summary email: %s (attach: %s)", subject, bool(attachment_to_send)) + + # Send email + try: + send_email_with_attachment( + subject=subject, + body=body, + attachment_path=attachment_to_send, + email_recipients=email_recipients + ) + except Exception as e: + logging.error("Failed to send summary email: %s", e) + + return + + + +if __name__ == "__main__": + try: + # Setup logging using standard TimedRotatingFileHandler handlers. + # No manual doRollover calls — rely on the handler's built-in rotation. + logger, rotating_handler, error_handler, warning_handler = setup_logging() + + # Load configuration settings + aws_config, db_config, ach_config, bucket_name, ach_variables = config.load_config() + + logging.info("Config loaded, logging setup done") + + # Run the main process + main_process(aws_config, db_config, ach_config, bucket_name, ach_variables) + + logging.info("Main process completed at: %s", datetime.now().strftime('%Y-%m-%d %H:%M:%S')) + + except Exception as e: + logging.error(f"An error occurred: {e}") \ No newline at end of file diff --git a/query-sql.md b/query-sql.md new file mode 100644 index 0000000..20a8879 --- /dev/null +++ b/query-sql.md @@ -0,0 +1,82 @@ +### Find records where the original filename indicates an H264 variant + +-- Purpose: list distinct base records that have an "original" filename matching the +-- pattern FILE%_H264% (i.e. files stored under the FILE... folder or beginning with +-- "FILE" and containing the "_H264" marker). This helps locate master records that +-- also have an H264-derived file present. +-- Columns returned: +-- base_id : the parent/base record id (h_base_record_id) +-- file_type : the logical file type extracted from the JSON `file_type` column +-- original_file_name: the stored original filename (may include folder/prefix) +-- digital_file_name : the current digital filename in the database +SELECT DISTINCT + h_base_record_id AS base_id, + file_type ->> 'type' AS file_type, + original_file_name, + digital_file_name +FROM file +WHERE file_type ->> 'type' IS NOT NULL + AND original_file_name LIKE 'FILE%_H264%'; + +### Audio files (mp3) that are not in the FILE/ folder + +-- Purpose: find distinct base records for streaming audio (.mp3) where the original +-- filename is not located in the FILE/... area. Useful to separate ingest/original +-- conservative copies (often under FILE/) from streaming or derivative objects. +SELECT DISTINCT + h_base_record_id AS base_id, + file_type ->> 'type' AS file_type +FROM file +WHERE file_type ->> 'type' IS NOT NULL + AND original_file_name NOT LIKE 'FILE%' + AND digital_file_name LIKE '%mp3'; + +### Video files (mp4) that are not in the FILE/ folder + +-- Purpose: same as the mp3 query but for mp4 streaming/derivative files. This helps +-- identify which base records currently have mp4 derivatives recorded outside the +-- FILE/... (master) namespace. +SELECT DISTINCT + h_base_record_id AS base_id, + file_type ->> 'type' AS file_type +FROM file +WHERE file_type ->> 'type' IS NOT NULL + AND original_file_name NOT LIKE 'FILE%' + AND digital_file_name LIKE '%mp4'; + +### Records with non-image digital files + +-- Purpose: list base records that have digital files which are not JPEG images. The +-- `NOT LIKE '%jpg'` filter excludes typical image derivatives; this is useful for +-- auditing non-image assets attached to records. +SELECT DISTINCT + h_base_record_id AS base_id, + file_type ->> 'type' AS file_type +FROM file +WHERE file_type ->> 'type' IS NOT NULL + AND original_file_name NOT LIKE 'FILE%' + AND digital_file_name NOT LIKE '%jpg'; + +### Count of unique base records per file_type + +-- Purpose: aggregate the number of distinct base records (h_base_record_id) associated +-- with each `file_type` value. This gives an overview of how many unique objects have +-- files recorded for each logical file type. +SELECT + file_type ->> 'type' AS file_type, + COUNT(DISTINCT h_base_record_id) AS file_type_unique_record_count +FROM file +WHERE file_type ->> 'type' IS NOT NULL +GROUP BY file_type ->> 'type'; + +### Duplicate of the previous aggregate (kept for convenience) + +-- Note: the query below is identical to the one above and will produce the same +-- counts; it may be intentional for running in a separate context or as a copy-and-paste +-- placeholder for further edits. +SELECT + file_type ->> 'type' AS file_type, + COUNT(DISTINCT h_base_record_id) AS file_type_unique_record_count +FROM file +WHERE file_type ->> 'type' IS NOT NULL +GROUP BY file_type ->> 'type'; \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f913ba5 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +boto3 >= 1.35.39 +botocore >= 1.35.39 +psycopg2-binary>=2.9.9 +aiohttp >= 3.10.10 +asyncio >= 3.4.3 +python-dotenv >= 1.0.1 +email-validator >= 2.2.0 +pytz >= 2024.2 \ No newline at end of file diff --git a/s3_utils.py b/s3_utils.py new file mode 100644 index 0000000..b9c2c42 --- /dev/null +++ b/s3_utils.py @@ -0,0 +1,326 @@ +import boto3 # for S3 +from botocore.exceptions import NoCredentialsError, PartialCredentialsError, ClientError # for exceptions +import logging # for logging +import json # for json.loads +import os # for os.path +import psycopg2 # for PostgreSQL + +# Import custom modules +from file_utils import retrieve_file_contents, check_related_files, extract_and_validate_file_info # for file operations +from email_utils import handle_error # for error handling depecradted? +from db_utils import get_db_connection, check_inventory_in_db, check_objkey_in_file_db, add_file_record_and_relationship, retrieve_digital_file_names # for database operations + +import config + +# Function to check the existence of related files and validate in PostgreSQL +def parse_s3_files( s3, s3_files, ach_variables, excluded_folders=[]): + """ + Parses the S3 files and performs various operations on them. + Args: + s3 (S3): The S3 object for accessing S3 services. + s3_files (list): The list of S3 files to be processed. + Returns: + None + Raises: + FileNotFoundError: If a required file is not found in S3. + ValueError: If a file has zero size or if the file type is unsupported. + Exception: If any other unexpected exception occurs. + """ + # Load the configuration from the .env file + _ , db_config, _, bucket_name, _ = config.load_config() + # logg ach_variables + logging.info(f"ach_variables: {ach_variables}") + + logging.info(f"Starting to parse S3 files from bucket {bucket_name}...") + + try: + logging.info(f"Starting to parse S3 files from bucket {bucket_name}...") + # Ensure db_config is not None + if db_config is None: + raise ValueError("Database configuration is not loaded") + # return # Exit the function if db_config is None + + conn = psycopg2.connect(**db_config) + cur = conn.cursor() + + # Filter files with the desired prefix + # excluded_prefix = ['TEST-FOLDER-DEV/', 'DOCUMENTAZIONE_FOTOGRAFICA/', 'BTC/', 'VHS/', 'UMT/', 'OV2/', 'OA4/'] + excluded_prefix = excluded_folders + # Exclude files that start with any prefix in the excluded_prefix array + filtered_files = [file for file in s3_files if not any(file.startswith(prefix) for prefix in excluded_prefix)] + + # Filter files with the desired prefix + # DEBUG : filtered_files = [file for file in s3_files if file.startswith('TestFolderDev/')] + # Length of filtered files + logging.info(f"Array Length of filtered files: {len(filtered_files)}") + # Counters + error_files_count = 0 + warning_files_count = 0 + uploaded_files_count = 0 + #for file in s3_files: + for file in filtered_files: + if file.endswith(('.mp4', '.mp3')): # Check for both .mp4 and .mp3 + logging.info("Processing file: %s in the bucket: %s", file, bucket_name) + # check if file exists in db + result = check_objkey_in_file_db( cur, file) + # Check the result and proceed accordingly + if result: + # logging.warning(f"File {file} already exists in the database.") + warning_files_count +=1 + continue + + ach_variables['file_fullpath'] = file # is the Object key + ach_variables['inventory_code'] = os.path.splitext(os.path.basename(file))[0][:12] + logging.info(f"ach_variables['inventory_code'] {ach_variables['inventory_code']}: {file}") + # Extract the file extension + ach_variables['objectKeys']['media'] = file + ach_variables['objectKeys']['pdf'] = f"{os.path.splitext(file)[0]}.pdf" + ach_variables['objectKeys']['pdf'] = ach_variables['objectKeys']['pdf'].replace('_H264', '') + if file.endswith('.mp4'): + ach_variables['objectKeys']['conservative_copy'] = f"{os.path.splitext(file)[0]}.mov" # remove _H264 is done later + elif file.endswith('.mp3'): + ach_variables['objectKeys']['conservative_copy'] = f"{os.path.splitext(file)[0]}.wav" + else: + logging.KeyError(f"Unsupported file type: {file}") + error_files_count +=1 + continue + + # Extract the file extension + file_extension = os.path.splitext(file)[1] + ach_variables['extension'] = file_extension # Store the file extension in ach_variables + logging.info(f"the file File extension: {file_extension}") + + # Extract the file name with directory part + file_name_with_path = os.path.splitext(file)[0] # Remove the extension but keep path + logging.info(f"File name with path: {file_name_with_path}") + + # Extract the base name from the file name + base_name = os.path.basename(file_name_with_path) # Extract the base name with path removed + logging.info(f"Base name: {base_name}") + + # Apply _H264 removal only for .mp4 files + if file.endswith('.mp4'): + logging.info(f"File is an mp4 file: {file}. remove _H264") + base_name = base_name.replace('_H264', '') + file_name_with_path = file_name_with_path.replace('_H264', '') + logging.info(f"Modified base name for mp4: {base_name}") + logging.info(f"Modified file name with path for mp4: {file_name_with_path}") + + try: + # Retrieve and log the file size + file_size = get_file_size(s3, bucket_name, file) + # maybe can trow an error inside te get_file_size function and catch it here + if file_size is not None: + ach_variables['media_disk_size'] = file_size + logging.info(f"The media file disk size is: {ach_variables['media_disk_size']}") + else: + logging.warning("Could not retrieve file size for %s.", file) + warning_files_count +=1 + continue # Skip to the next file in the loop + + logging.info("Start Validating files for %s...", base_name) + # Check if related file exist and retreive .pdf file size + try: + # Check if the required files exist in S3 + ach_variables['pdf_disk_size'] = check_related_files(s3, file_name_with_path, file, bucket_name) + logging.info(f"PDF disk size: {ach_variables['pdf_disk_size']}") + except FileNotFoundError as e: + # Handle case where the file is not found + logging.error(f"File not found error: {e}") + error_files_count +=1 + continue # Move on to the next file in the loop + except ValueError as e: + # Handle value errors + logging.error(f"Value error: {e} probabli filesize zero") + error_files_count +=1 + continue # Move on to the next file in the loop + except PermissionError as e: + # Handle permission errors + logging.error(f"Permission error: {e}") + error_files_count +=1 + continue # Move on to the next file in the loop + except Exception as e: + # Handle any other exceptions + logging.error(f"An error occurred: {e}") + + # Retrieve the file contents for related files: .md5, .json + try: + # Check if the file exists in S3 and retrieve file contents + logging.info(f"Retrieving file contents for {file_name_with_path}...") + file_contents = retrieve_file_contents(s3, f"{file_name_with_path}") + except Exception as e: + # Log the error + logging.error(f"Error retrieving file contents for {file_name_with_path}: {e}") + file_contents = None # Set file_contents to None or handle it as needed + error_files_count +=1 + continue # Move on to the next file in the loop + + # if contents dont exists + if file_contents is None: + logging.error(f"Error retrieving file contents for {file}.") + error_files_count +=1 + continue # Move on to the next file in the loop + + # Ensure file_contents is a dictionary + if isinstance(file_contents, str): + file_contents = json.loads(file_contents) + + # Extract and validate file information + ach_variables['custom_data_in'], ach_variables['disk_size'], ach_variables['conservative_copy_extension'] = extract_and_validate_file_info(file_contents, file, ach_variables) + logging.info(f"Custom data extracted: {ach_variables['custom_data_in']}") + logging.info(f"Disk size extracted: {ach_variables['disk_size']}") + logging.info(f"Conservative copy extension extracted: {ach_variables['conservative_copy_extension']}") + logging.info(f"File {file} file validation completed") + except Exception as e: + logging.error(f"Error processing file {file}: {e}") + error_files_count +=1 + continue # Move on to the next file in the loop + + # no need truncate base name on this point + # base_name = base_name[:12] # Truncate the base name to 12 characters + # //////////////////////////////////////////////////////////////////////////////// + + # Check if the base name exists in the database + logging.info(f"Checking database for {base_name}...") + + try: + # Call the function to check the inventory code in the database and get the result + result, truncated_base_name = check_inventory_in_db(s3, cur, base_name) + logging.info(f"base name {base_name}, truncated_base_name: {truncated_base_name}") + # Check the result and proceed accordingly + if result: + logging.info(f"Inventory code {base_name} found in the database.") + # Call the function to retrieve digital file names + if retrieve_digital_file_names(s3, cur, base_name, ach_variables['objectKeys']['media']) == True: + # Call the function to add a file record and its relationship to the support record + # logging.info(f"ach_variables: {ach_variables}") + add_file_record_and_relationship(s3, cur, base_name, ach_variables) + else: + logging.warning(f"File record already exists for {base_name}.") + warning_files_count +=1 + continue + else: + logging.error(f"Inventory code {base_name} not found in the database.") + error_files_count +=1 + continue + except ValueError as e: + logging.error(f"An error occurred: {e}") + error_files_count +=1 + continue + + # Commit the changes to the database + logging.info(f"commit to databse {base_name}...") + + # Commit to the database (conn, cur) only if everything is okay; otherwise, perform a rollback. + conn.commit() + uploaded_files_count +=1 + cur.close() + conn.close() + except ValueError as e: + # Handle specific validation errors + logging.error(f"Validation error: {e}") + #handle_error(e) # Pass the ValueError to handle_error + raise e # Raise the exception to the calling function + except Exception as e: + # Handle any other unexpected errors + logging.error(f"Unexpected error: {e}") + #handle_error(e) # Pass unexpected errors to handle_error + raise e # Raise the exception to the calling function + + # return the file saved + return uploaded_files_count, warning_files_count, error_files_count + +# Function to create an S3 client +def create_s3_client(aws_config): + logging.info(f'Creating S3 client with endpoint: {aws_config["endpoint_url"]}') + try: + s3 = boto3.client( + 's3', + endpoint_url=aws_config['endpoint_url'], + aws_access_key_id=aws_config['aws_access_key_id'], + aws_secret_access_key=aws_config['aws_secret_access_key'], + region_name=aws_config['region_name'], + config=boto3.session.Config( + signature_version='s3v4', + s3={'addressing_style': 'path'} + ) + ) + logging.info('S3 client created successfully') + return s3 + except (NoCredentialsError, PartialCredentialsError) as e: + logging.error(f'Error creating S3 client: {e}') + raise e + +# Function to list the contents of an S3 bucket +def list_s3_bucket(s3_client, bucket_name): + try: + paginator = s3_client.get_paginator('list_objects_v2') + bucket_contents = [] + + for page in paginator.paginate(Bucket=bucket_name): + if 'Contents' in page: + bucket_contents.extend(page['Contents']) + + logging.info(f"Retrieved {len(bucket_contents)} items from the bucket.") + return bucket_contents + except ClientError as e: + logging.error(f'Error listing bucket contents: {e}') + raise e + +# Function to get file size from S3 +def get_file_size(s3_client, bucket_name, file_key): + try: + response = s3_client.head_object(Bucket=bucket_name, Key=file_key) + return response['ContentLength'] + except ClientError as e: + logging.error(f"Failed to retrieve file size for {file_key}: {e}") + return None # or an appropriate fallback value + except Exception as e: + logging.error(f"An unexpected error occurred: {e}") + return None + +# Function to check if a file exists in S3 +def check_file_exists_in_s3(s3, file_name,bucket_name): + """ + Checks if a file exists in an S3 bucket. + + Parameters: + - s3 (boto3.client): The S3 client object. + - file_name (str): The name of the file to check. + + Returns: + - bool: True if the file exists, False otherwise. + + Raises: + - ClientError: If there is an error checking the file. + + """ + try: + s3.head_object(Bucket=bucket_name, Key=file_name) + return True + except ClientError as e: + if e.response['Error']['Code'] == '404': + return False + else: + logging.error(f'Error checking file {file_name}: {e}') + raise e + +# Function to retrieve file contents from S3 +def upload_file_to_s3(s3_client, file_path, bucket_name, object_name=None): + if object_name is None: + object_name = file_path + try: + s3_client.upload_file(file_path, bucket_name, object_name) + logging.info(f"File {file_path} uploaded to {bucket_name}/{object_name}") + except ClientError as e: + logging.error(f'Error uploading file {file_path} to bucket {bucket_name}: {e}') + raise e + +# Function to download a file from S3 +def download_file_from_s3(s3_client, bucket_name, object_name, file_path): + try: + s3_client.download_file(bucket_name, object_name, file_path) + logging.info(f"File {object_name} downloaded from {bucket_name} to {file_path}") + except ClientError as e: + logging.error(f'Error downloading file {object_name} from bucket {bucket_name}: {e}') + raise e \ No newline at end of file diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..ce52927 --- /dev/null +++ b/utils.py @@ -0,0 +1,99 @@ +# Description: Utility functions for the media validation service +# Art.c.hive 2024/09/30 + +# Standard libs +import logging +import os +# import logging_config + + +# result, message = check_video_info(audio_json_content) +def check_video_info(media_info): + logging.info("Checking video info...") + logging.info(f"Media info: {media_info}") + try: + #('mediainfo', {}).get('media', {}).get('track', []) + # Check if the file name ends with .mov + file_name = media_info.get('media', {}).get('@ref', '') + logging.info(f"File name in JSON: {file_name}") + # Determine the parent directory (one level above the basename). + # Example: for 'SOME/FOLDER/filename.mov' -> parent_dir == 'FOLDER' + parent_dir = os.path.basename(os.path.dirname(file_name)) + logging.info(f"Parent directory: {parent_dir}") + + # If the parent directory is 'FILE' accept multiple container types + if parent_dir.lower() == 'file': + # Accept .mov, .avi, .m4v, .mp4, .mxf, .mpg (case-insensitive) + if not any(file_name.lower().endswith(ext) for ext in ('.mov', '.avi', '.m4v', '.mp4', '.mxf', '.mpg', '.mpeg')): + return False, "The file is not a .mov, .avi, .m4v, .mp4, .mxf, .mpg or .mpeg file." + + # Map file extensions to lists of acceptable general formats (video) + general_formats = { + '.avi': ['AVI'], # General/Format for AVI files + '.mov': ['QuickTime', 'MOV', 'MPEG-4'], # MediaInfo may report QuickTime or MOV for .mov + '.mp4': ['MPEG-4', 'MP4', 'QuickTime'], # MPEG-4 container (QuickTime variant ?? VO-MP4-16028_H264.mp4) + '.m4v': ['MPEG-4', 'MP4'], # MPEG-4 container (Apple variant) + '.mxf': ['MXF'], # Material eXchange Format + '.mpg': ['MPEG','MPEG-PS'], # MPEG program/transport streams + '.mpeg': ['MPEG','MPEG-PS'], + } + + # check that the extension correspond to one of the allowed formats in track 0 in the corresponding json file + file_ext = os.path.splitext(file_name)[1].lower() + logging.info(f"File extension: {file_ext}") + expected_formats = general_formats.get(file_ext) + logging.info(f"Expected formats for extension {file_ext}: {expected_formats}") + if not expected_formats: + return False, f"Unsupported file extension: {file_ext}" + tracks = media_info.get('media', {}).get('track', []) + if len(tracks) > 0: + track_0 = tracks[0] # Assuming track 0 is the first element (index 0) + logging.info(f"Track 0: {track_0}") + actual_format = track_0.get('Format', '') + if track_0.get('@type', '') == 'General' and actual_format in expected_formats: + logging.info(f"File extension {file_ext} matches one of the expected formats {expected_formats} (actual: {actual_format}).") + else: + return False, f"Track 0 format '{actual_format}' does not match any expected formats {expected_formats} for extension {file_ext}." + else: + # Outside FILE/ directory require .mov specifically + if not file_name.lower().endswith('.mov'): + return False, "The file is not a .mov file." + # Check if track 1's format is ProRes + tracks = media_info.get('media', {}).get('track', []) + if len(tracks) > 1: + track_1 = tracks[1] # Assuming track 1 is the second element (index 1) + logging.info(f"Track 1: {track_1}") + if track_1.get('@type', '') == 'Video' and track_1.get('Format', '') == 'ProRes' and track_1.get('Format_Profile', '') == '4444': + return True, "The file is a .mov file with ProRes format in track 1." + else: + return False, "Track 1 format is not ProRes." + else: + return False, "No track 1 found." + + return True, "The file passed the video format checks." + except Exception as e: + return False, f"Error processing the content: {e}" + +# result, message = check_audio_info(json_content) +def check_audio_info(media_info): + try: + # Check if the file name ends with .wav + file_name = media_info.get('media', {}).get('@ref', '') + if not file_name.endswith('.wav'): + logging.info(f"File name in JSON: {file_name}") + return False, "The file is not a .wav file." + + # Check if track 1's format is Wave + tracks = media_info.get('media', {}).get('track', []) + # Ensure there are at least two track entries before accessing index 1 + if len(tracks) > 1: + track_1 = tracks[1] # Assuming track 1 is the second element (index 1) + if track_1.get('@type', '') == 'Audio' and track_1.get('Format', '') == 'PCM' and track_1.get('SamplingRate', '') == '96000' and track_1.get('BitDepth', '') == '24': + return True, "The file is a .wav file with Wave format in track 1." + else: + return False, f"Track 1 format is not Wave. Format: {track_1.get('Format', '')}, SamplingRate: {track_1.get('SamplingRate', '')}, BitDepth: {track_1.get('BitDepth', '')}" + + return False, "No track 1 found." + + except Exception as e: + return False, f"Error processing the content: {e}" \ No newline at end of file