diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md new file mode 100644 index 0000000..95468b4 --- /dev/null +++ b/.github/copilot-instructions.md @@ -0,0 +1,65 @@ +# ACH Server Media Import - Agent Instructions + +Guidelines and standards for the ACH Media Import project. + +## Project Overview +This project is a Python-based utility that imports media files from an S3-compatible bucket into a PostgreSQL database, enforcing specific naming conventions and metadata validation. + +## Technical Stack +- **Language**: Python 3.8+ +- **Database**: PostgreSQL (via `psycopg2`) +- **Cloud Storage**: AWS S3/S3-compatible storage (via `boto3`) +- **Containerization**: Docker & Docker Compose +- **Environment**: Managed via `.env` and `config.py` + +## Architecture & Modular Design +The project uses a utility-based modular architecture orchestrated by `main.py`. +- [main.py](main.py): Entry point and workflow orchestrator. +- [s3_utils.py](s3_utils.py): S3 client operations and bucket listing. +- [db_utils.py](db_utils.py): Database connectivity and SQL execution. +- [validation_utils.py](validation_utils.py): Pattern matching and business logic validation. +- [logging_config.py](logging_config.py): Centralized logging configuration. +- [error_handler.py](error_handler.py): Error handling and notifications. +- [email_utils.py](email_utils.py): SMTP integration for alerts. + +## Domain Logic: Inventory Codes +The core validation revolves around "Inventory Codes" which MUST follow a strict 12-character format: +- `^[VA][OC]-[A-Z0-9]{3}-\d{5}$` +- Examples: `VA-C01-12345`, `OC-A99-67890`. +- Files not matching this pattern in S3 are logged but skipped. + +## Development Workflows + +### Environment Setup +- **Windows**: Use `. .venv\Scripts\Activate.ps1` +- **Linux/macOS**: Use `source .venv/bin/activate` +- **Dependency installation**: `pip install -r requirements.txt` + +### Local Execution +- **Run script**: `python main.py` +- **Verify Configuration**: Ensure `.env` is populated with `DB_`, `AWS_`, and `SMTP_` variables. + +### Docker Operations +- **Build/Up**: `docker compose up -d --build` +- **Logs**: `docker compose logs -f app` +- **Stop**: `docker compose stop` + +## Coding Standards & Conventions + +### Logging +- Use the custom logger from `logging_config.py`. +- **Log Levels**: Use `logging.INFO`, `logging.WARNING`, and the custom `CUSTOM_ERROR_LEVEL` (35) via `error_handler.py`. +- Logs are rotated and stored in the `logs/` directory. + +### Error Handling +- Wrap critical operations that should trigger notifications in try-except blocks that call `error_handler.notify_error()`. +- Avoid silent failures; ensure errors are logged to the appropriate file sync. + +### Configuration +- Access settings exclusively via the `config.py` module's dictionaries: `db_config`, `aws_config`, `ach_config`. +- Never hardcode credentials or endpoints. + +## Related Files +- [query-sql.md](query-sql.md): Reference for database schema and SQL logic. +- [requirements.txt](requirements.txt): Project dependencies. +- [docker-compose.yml](docker-compose.yml): Deployment configuration. diff --git a/README.md b/README.md index 39a851b..6c033eb 100644 --- a/README.md +++ b/README.md @@ -1,138 +1,126 @@ -# Project Setup +# ACH Server Media Import -## Setting up a Virtual Environment +This repository contains a script that imports media files from an S3-compatible bucket into a PostgreSQL database. It supports both local execution (Python virtual environment) and Docker deployment via `docker-compose`. -1. **Create a virtual environment:** +--- - ### For Linux/macOS: - ```bash - python3 -m venv .venv - ``` +## Overview - ### For Windows: - ```bash - ## ACH-server-import-media +### Asset hierarchy +- **Conservatory Copy (Master)**: High-quality source (e.g., `.mov`, `.wav`). This is the primary record in the database. +- **Streaming Copy (Derivative)**: Transcoded versions (`.mp4`, `.mp3`) linked to the master. +- **Sidecar Metadata (`.json`)**: Contains technical metadata (`mediainfo` / `ffprobe`) used for validation and to determine the correct MIME type. +- **Sidecar QC (`.pdf`, `.md5`)**: Quality control and checksum files. - This repository contains a script to import media files from an S3-compatible bucket into a database. It supports both local execution (virtual environment) and Docker-based deployment via `docker-compose`. +> **Important:** all files belonging to the same asset must share the same 12-character inventory code (e.g., `VO-UMT-14387`). - Contents - - `main.py` - main import script - - `docker-compose.yml` - docker-compose service for running the importer in a container - - `requirements.txt` - Python dependencies - - `config.py`, `.env` - configuration and environment variables +--- - Prerequisites - - Docker & Docker Compose (or Docker Desktop) - - Python 3.8+ - - Git (optional) +## Process Phases - Quick local setup (virtual environment) +The importer runs in three clearly separated phases (each phase is logged in detail): - Linux / macOS - ```bash - python3 -m venv .venv - source .venv/bin/activate - pip install -r requirements.txt - ``` +### Phase 1 – S3 discovery + initial validation +- List objects in the configured S3 bucket. +- Keep only allowed extensions: `.mp4`, `.mp3`, `.json`, `.pdf`, `.md5`. +- Exclude configured folders (e.g., `TEST-FOLDER-DEV/`, `DOCUMENTAZIONE_FOTOGRAFICA/`, `UMT/`). +- Validate the inventory code format and ensure the folder prefix matches the type encoded in the inventory code. +- Files failing validation are rejected **before** any database interaction. - Windows (PowerShell) - ```powershell - python -m venv .venv - . .venv\Scripts\Activate.ps1 - pip install -r requirements.txt - ``` +### Phase 2 – Database cross-reference + filtering +- Load existing filenames from the database. +- Skip files already represented in the DB, including sidecar records. +- Build the final list of S3 objects to parse. - Running locally - 1. Ensure your configuration is available (see `config.py` or provide a `.env` file with the environment variables used by the project). - 2. Run the script (from the project root): +### Phase 3 – Parse & insert +- Read and validate sidecars (`.json`, `.md5`, `.pdf`) alongside the media file. +- Use metadata (from `mediainfo` / `ffprobe`) to derive the **master mime type** and enforce container rules. +- Insert new records into the database (unless `ACH_DRY_RUN=true`). - Linux / macOS - ```bash - python main.py - ``` +--- - Windows (PowerShell) - ```powershell - & .venv\Scripts\python.exe main.py - ``` +## Validation Policy - Docker Compose +The import pipeline enforces strict validation to prevent bad data from entering the database. - This project includes a `docker-compose.yml` with a service named `app` (container name `ACH_server_media_importer`). The compose file reads environment variables from `.env` and mounts a `logs` named volume. +### Inventory Code & Folder Prefix +- Expected inventory code format: `^[VA][OC]-[A-Z0-9]{3}-\d{5}$`. +- The folder prefix (e.g., `BRD/`, `DVD/`, `FILE/`) must match the code type. +- If the prefix does not match the inventory code, the file is rejected in Phase 1. - Build and run (detached) - ```powershell - # Docker Compose v2 syntax (recommended) - # From the repository root +### Safe Run (`ACH_SAFE_RUN`) +- When `ACH_SAFE_RUN=true`, **any warning during Phase 3 causes an immediate abort**. +- This prevents partial inserts when the importer detects inconsistent or already-present data. - docker compose up -d --build +### MIME Type Determination +- The MIME type for master files is derived from the JSON sidecar metadata (`mediainfo` / `ffprobe`), not from the streaming derivative extension. - # OR if your environment uses the v1 binary - # docker-compose up -d --build - ``` +--- - Show logs - ```powershell - # Follow logs for the 'app' service - docker compose logs -f app +## Quick Start (Local) - # Or use the container name - docker logs -f ACH_server_media_importer - ``` +### Prerequisites +- Python 3.8+ +- Virtual environment support (`venv`) - Stop / start / down - ```powershell - # Stop containers - docker compose stop +### Setup - # Start again - docker compose start +```bash +python3 -m venv .venv +source .venv/bin/activate +pip install -r requirements.txt +``` - # Take down containers and network - docker compose down - ``` +### Run - Rebuild when already running +```bash +python main.py +``` - There are two safe, common ways to rebuild a service when the containers are already running: +--- - 1) Rebuild in-place and recreate changed containers (recommended for most changes): +## Docker (docker-compose) - ```powershell - # Rebuild images and recreate services in the background - docker compose up -d --build - ``` +The project includes a `docker-compose.yml` with an `app` service (container name `ACH_server_media_importer`). It reads environment variables from `.env` and mounts a `logs` volume. - This tells Compose to rebuild the image(s) and recreate containers for services whose image or configuration changed. +### Build & run - 2) Full clean rebuild (use when you need to remove volumes or ensure a clean state): +```bash +docker compose up -d --build +``` - ```powershell - # Stop and remove containers, networks, and optionally volumes & images, then rebuild - docker compose down --volumes --rmi local - docker compose up -d --build - ``` +### Logs - Notes - - `docker compose up -d --build` will recreate containers for services that need updating; it does not destroy named volumes unless you pass `--volumes` to `down`. - - If you need to execute a shell inside the running container: +```bash +docker compose logs -f app +``` - ```powershell - # run a shell inside the 'app' service - docker compose exec app /bin/sh - # or (if bash is available) - docker compose exec app /bin/bash - ``` +### Stop - Environment and configuration - - Provide sensitive values via a `.env` file (the `docker-compose.yml` already references `.env`). - - Typical variables: `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_REGION`, `BUCKET_NAME`, `DB_HOST`, `DB_NAME`, `DB_USER`, `SMTP_SERVER`, etc. +```bash +docker compose stop +``` - Troubleshooting - - If Compose fails to pick up code changes, ensure your local Dockerfile `COPY` commands include the source files and that `docker compose up -d --build` is run from the repository root. - - Use `docker compose logs -f app` to inspect runtime errors. +### Rebuild (clean) - If you'd like, I can add a short `Makefile` or a PowerShell script to wrap the common Docker Compose commands (build, rebuild, logs) for convenience. +```bash +docker compose down --volumes --rmi local +docker compose up -d --build +``` - --- - Edited to add clear docker-compose rebuild and run instructions. +--- + +## Configuration + +Configuration is driven by `.env` and `config.py`. Key variables include: +- `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_REGION`, `BUCKET_NAME` +- `DB_HOST`, `DB_NAME`, `DB_USER`, `DB_PASSWORD`, `DB_PORT` +- `ACH_DRY_RUN` (`true` / `false`) +- `ACH_SAFE_RUN` (`true` / `false`) + +--- + +## Troubleshooting + +- If Docker does not pick up changes, ensure `docker compose up -d --build` is run from the repo root. +- Inspect runtime errors via `docker compose logs -f app`. diff --git a/config.py b/config.py index 739f200..ddb0dd7 100644 --- a/config.py +++ b/config.py @@ -66,5 +66,21 @@ def load_config(): return aws_config, db_config, ach_config, bucket_name, ach_variables +EXTENSION_MIME_MAP = { + '.avi': 'video/x-msvideo', + '.mov': 'video/quicktime', + '.wav': 'audio/wav', + '.mp4': 'video/mp4', + '.m4v': 'video/mp4', + '.mp3': 'audio/mp3', + '.mxf': 'application/mxf', + '.mpg': 'video/mpeg', + '.aif': 'audio/aiff', + '.wmv': 'video/x-ms-asf', + '.m4a': 'audio/mp4', +} -# Consider using a class for a more structured approach (optional) \ No newline at end of file +MIME_TYPES = sorted(list(set(EXTENSION_MIME_MAP.values()))) + + +# Consider using a class for a more structured approach (optional) diff --git a/countfiles.py b/countfiles.py index cd1af99..f0ee9be 100644 --- a/countfiles.py +++ b/countfiles.py @@ -130,33 +130,13 @@ def main_process(aws_config, db_config, ach_config, bucket_name, ach_variables): # connect to database conn = psycopg2.connect(**db_config) cur = conn.cursor() - # function count_files that are wav and mov in db - # Map file extensions (include leading dot) to mime types - EXTENSION_MIME_MAP = { - '.avi': 'video/x-msvideo', - '.mov': 'video/mov', - '.wav': 'audio/wav', - '.mp4': 'video/mp4', - '.m4v': 'video/mp4', - '.mp3': 'audio/mp3', - '.mxf': 'application/mxf', - '.mpg': 'video/mpeg', - } + + # Use centralized mime types from config + from config import EXTENSION_MIME_MAP, MIME_TYPES - # populate mime_type list with all relevant MediaInfo/MIME values - mime_type = [ - 'video/x-msvideo', # .avi - 'video/mov', # .mov - 'audio/wav', # .wav - 'video/mp4', # .mp4, .m4v - 'audio/mp3', # .mp3 - 'application/mxf', # .mxf - 'video/mpeg', # .mpg - ] + logging.info(f"Mime types for counting files: {MIME_TYPES}") - logging.info(f"Mime types for counting files: {mime_type}") - - all_files_on_db = count_files(cur, mime_type,'*', False) + all_files_on_db = count_files(cur, MIME_TYPES,'*', False) mov_files_on_db = count_files(cur,['video/mov'],'.mov', False ) mxf_files_on_db = count_files(cur,['application/mxf'],'.mxf', False ) mpg_files_on_db = count_files(cur,['video/mpeg'],'.mpg', False ) diff --git a/db_utils.py b/db_utils.py index 8ed3bd4..5f13640 100644 --- a/db_utils.py +++ b/db_utils.py @@ -34,18 +34,7 @@ from email_utils import handle_error import json import os import config - -# Map file extensions (include leading dot) to mime types -EXTENSION_MIME_MAP = { - '.avi': 'video/x-msvideo', - '.mov': 'video/mov', - '.wav': 'audio/wav', - '.mp4': 'video/mp4', - '.m4v': 'video/mp4', - '.mp3': 'audio/mp3', - '.mxf': 'application/mxf', - '.mpg': 'video/mpeg', -} +from config import EXTENSION_MIME_MAP def get_mime_for_extension(extension: str) -> str: """Return the mime type for an extension. Accepts with or without leading dot. @@ -58,6 +47,76 @@ def get_mime_for_extension(extension: str) -> str: extension = f'.{extension}' return EXTENSION_MIME_MAP.get(extension.lower(), 'application/octet-stream') + +def get_mime_from_mediainfo(ach_variables: dict) -> str: + """Determine a MIME type from the JSON sidecar mediainfo. + + This is used to capture the *master* format (conservatory copy) even when + the stream copy on S3 is a different container (e.g., _H264.mp4). + + If mediainfo is missing or cannot be mapped, fall back to extension-based mapping. + """ + # Prefer the master (conservative copy) extension when it is explicitly available. + # In some cases MediaInfo reports "MPEG-4" for .mov containers, so the extension + # is a more reliable hint for the correct mime type. + conservative_ext = ach_variables.get('conservative_copy_extension') + if conservative_ext and conservative_ext.lower() == '.mov': + return 'video/quicktime' + + # Also check the actual conservative copy object key (from JSON @ref). This is the + # name that will be stored in the DB as the master file, so it should drive the MIME. + conservative_copy_key = ach_variables.get('objectKeys', {}).get('conservative_copy', '') + if conservative_copy_key and os.path.splitext(conservative_copy_key)[1].lower() == '.mov': + return 'video/quicktime' + + # Try to find the General track format from mediainfo + try: + mediainfo = ach_variables.get('custom_data_in', {}).get('mediainfo', {}) + tracks = mediainfo.get('media', {}).get('track', []) + for track in tracks: + if track.get('@type', '') == 'General': + format_value = track.get('Format', '') + if format_value: + # Map common MediaInfo format values to MIME types + mapping = { + 'AVI': 'video/x-msvideo', + 'MOV': 'video/quicktime', + 'QuickTime': 'video/quicktime', + 'MPEG-4': 'video/mp4', + 'MP4': 'video/mp4', + 'MXF': 'application/mxf', + 'MPEG': 'video/mpeg', + 'MPEG-PS': 'video/mpeg', + 'MPEG-TS': 'video/MP2T', + 'MPEG Audio': 'audio/mpeg', + 'MPEG Audio/Layer 3': 'audio/mpeg', + 'AAC': 'audio/aac', + 'PCM': 'audio/wav', + 'WAV': 'audio/wav', + 'AIFF': 'audio/aiff', + 'FLAC': 'audio/flac', + } + # Do a case-insensitive match + for k, v in mapping.items(): + if format_value.lower() == k.lower(): + return v + # Try a fuzzy match based on known substrings + if 'avi' in format_value.lower(): + return 'video/x-msvideo' + if 'mp4' in format_value.lower(): + return 'video/mp4' + if 'mpeg' in format_value.lower(): + return 'video/mpeg' + if 'wav' in format_value.lower() or 'pcm' in format_value.lower(): + return 'audio/wav' + if 'mp3' in format_value.lower(): + return 'audio/mpeg' + # Fall back to extension-based mapping when metadata doesn't yield a mime + extension = ach_variables.get('extension') + return get_mime_for_extension(extension) + except Exception: + return get_mime_for_extension(ach_variables.get('extension')) + def get_distinct_filenames_from_db(): """Retrieve distinct digital file names from the Postgres DB. @@ -94,15 +153,19 @@ def check_inventory_in_db(s3_client, cur, base_name): # Define the pattern for the inventory code media_tipology_A = ['MCC', 'OA4', 'DAT'] + # FOR FILE add to media_tipology_A for readibily + media_tipology_A += ['M4A','AIF'] # add for "FILE" folders 04112025 + # TODO add other tipologies: AVI, M4V, MOV, MP4, MXF, MPG (done 04112025) media_tipology_V = [ 'OV1', 'OV2', 'UMT', 'VHS', 'HI8', 'VD8', 'BTC', 'DBT', 'IMX', 'DVD', - 'CDR', 'MDV', 'DVC', 'HDC', 'BRD', 'CDV', - 'AVI', 'M4V', 'MOV', 'MP4', 'MXF', 'MPG' # add for "file" folders 04112025 + 'CDR', 'MDV', 'DVC', 'HDC', 'BRD', 'CDV' ] + # FOR FILE add to media_tipology_V for readibily + media_tipology_V += ['AVI', 'M4V', 'MOV', 'MP4', 'MXF', 'MPG', 'WMV'] # add for "FILE" folders 04112025 - # list of known mime types (derived from EXTENSION_MIME_MAP) - mime_type = list({v for v in EXTENSION_MIME_MAP.values()}) + # Use centralized mime types from config + from config import MIME_TYPES try: logging.info(f"SUPPORT TYPOLOGY : {base_name[3:6]}") @@ -270,8 +333,8 @@ def add_file_record_and_relationship(s3_client, cur, base_name,ach_variables): file_availability_dict = 7 # Place Holder # add a new file record for the "copia conservativa" ach_variables['custom_data_in']['media_usage'] = 'master' # can be "copia conservativa" - # determine master mime type from the file extension - master_mime_type = get_mime_for_extension(ach_variables.get('extension')) + # determine master mime type using the JSON sidecar metadata (preferred) + master_mime_type = get_mime_from_mediainfo(ach_variables) new_file_id = add_file_record( cur, diff --git a/file_utils.py b/file_utils.py index a62b90b..72b3ea8 100644 --- a/file_utils.py +++ b/file_utils.py @@ -285,7 +285,31 @@ def extract_and_validate_file_info(file_contents, file, ach_variables): logging.warning(f"ach_file_fullpath '{basename_fullpath}' does not match JSON ffprobe file name '{basename_ffprobe}'.") else: logging.info(f"ach_file_fullpath '{basename_fullpath}' matches JSON ffprobe file name '{basename_ffprobe}'.") - + + # Check folder prefixes (e.g. FILE/ vs DBT/) match between S3 file and JSON refs + def _extract_prefix(path: str) -> str: + if not path: + return '' + # Normalize separators to '/' so we can reliably split on a single character + normalized = path.replace('\\', '/').lstrip('/') + return normalized.split('/', 1)[0] if '/' in normalized else normalized + + prefix_fullpath = _extract_prefix(ach_variables['file_fullpath']) + prefix_mediainfo = _extract_prefix(json_ref_mediainfo_path) + prefix_ffprobe = _extract_prefix(json_ref_ffprobe_path) + + if prefix_fullpath != prefix_mediainfo or prefix_fullpath != prefix_ffprobe: + logging.warning( + "Prefix mismatch for S3 file '%s': S3 prefix='%s' (fullpath='%s') vs JSON prefixes (mediainfo='%s' [%s], ffprobe='%s' [%s]).", + ach_variables.get('file_fullpath'), + prefix_fullpath, + ach_variables.get('file_fullpath'), + prefix_mediainfo, + json_ref_mediainfo_path, + prefix_ffprobe, + json_ref_ffprobe_path, + ) + if basename_fullpath != basename_mediainfo and basename_fullpath != basename_ffprobe: logging.error(f"ach_file_fullpath '{basename_fullpath}' does not match either JSON file name '{basename_mediainfo}' or '{basename_ffprobe}'.") raise ValueError(f"ach_file_fullpath '{basename_fullpath}' does not match either JSON file name '{basename_mediainfo}' or '{basename_ffprobe}'.") diff --git a/logging_config.py b/logging_config.py index 4a9b10c..9adb982 100644 --- a/logging_config.py +++ b/logging_config.py @@ -33,8 +33,8 @@ def _create_timed_handler(path: str, level=None, when='midnight', interval=1, ba """ _ensure_dir_for_file(path) handler = TimedRotatingFileHandler(path, when=when, interval=interval, backupCount=backupCount, encoding='utf-8') - # Use a readable suffix for rotated files (handler will append this after the filename) - handler.suffix = "%Y%m%d_%H%M%S" + # Default behavior is to append to the existing log file. + # Rotation happens when 'when' occurs. if fmt: handler.setFormatter(fmt) if level is not None: @@ -58,6 +58,8 @@ def setup_logging(): error_log_path = os.getenv('ERROR_LOG_FILE_PATH', "./logs/ACH_media_import_errors.log") warning_log_path = os.getenv('WARNING_LOG_FILE_PATH', "./logs/ACH_media_import_warnings.log") + if os.getenv('WARING_LOG_FILE_PATH'): # Fix typo in .env if present + warning_log_path = os.getenv('WARING_LOG_FILE_PATH') info_log_path = os.getenv('INFO_LOG_FILE_PATH', "./logs/ACH_media_import_info.log") # Create three handlers: info (all), warning (warning+), error (error+) diff --git a/main.py b/main.py index e880802..7bb25bb 100644 --- a/main.py +++ b/main.py @@ -12,7 +12,7 @@ from error_handler import handle_general_error, handle_file_not_found_error, han from file_utils import is_file_empty from db_utils import count_files, get_distinct_filenames_from_db from dotenv import load_dotenv -from validation_utils import validate_inventory_code, analyze_pattern_match, validate_icode_extension +from validation_utils import validate_inventory_code, analyze_pattern_match, validate_icode_extension, list_s3_not_in_db, validate_mp4_file, validate_mp3_file import config import psycopg2 @@ -33,6 +33,25 @@ def main_process(aws_config, db_config, ach_config, bucket_name, ach_variables): logging.info(f"bucket_name: {bucket_name}") + # SECURITY CHECK: If DRY_RUN is false and ENV is development, ask for confirmation + dry_run_env = os.getenv('ACH_DRY_RUN', 'true').lower() + ach_env = os.getenv('ACH_ENV', 'development').lower() + + if dry_run_env == 'false' and ach_env == 'development': + print("\n" + "!"*60) + print("!!! SECURITY CHECK: RUNNING IMPORT ON DEVELOPMENT ENVIRONMENT !!!") + print(f"DB_HOST: {db_config.get('host')}") + print(f"DB_NAME: {db_config.get('database')}") + print(f"DB_USER: {db_config.get('user')}") + print(f"DB_PORT: {db_config.get('port')}") + print("!"*60 + "\n") + + user_input = input(f"Please type the DB_NAME '{db_config.get('database')}' to proceed: ") + if user_input != db_config.get('database'): + print("Action aborted by user. Database name did not match.") + logging.error("Process aborted: User failed to confirm DB_NAME for development import.") + return + # Ensure timing variables are always defined so later error-email logic # won't fail if an exception is raised before end_time/elapsed_time is set. start_time = time.time() @@ -44,6 +63,20 @@ def main_process(aws_config, db_config, ach_config, bucket_name, ach_variables): try: logging.info("Starting the main process...") + # --------------------------------------------------------------------- + # PHASE 1: S3 OBJECT DISCOVERY + INITIAL VALIDATION + # + # 1) List objects in the configured S3 bucket. + # 2) Filter objects by allowed extensions and excluded folders. + # 3) Validate the inventory code format (e.g. VA-C01-12345) and ensure the + # folder prefix matches the code type (e.g. "BRD" folder for BRD code). + # 4) Reject files that violate naming conventions before any DB interaction. + # + # This phase is intentionally descriptive so the workflow can be understood + # from logs even if the function names are not immediately clear. + # --------------------------------------------------------------------- + logging.info("PHASE 1: S3 object discovery + initial validation") + # Helper to make spaces visible in filenames for logging (replace ' ' with open-box char) def _visible_spaces(name: str) -> str: try: @@ -60,7 +93,9 @@ def main_process(aws_config, db_config, ach_config, bucket_name, ach_variables): # Define valid extensions and excluded folders valid_extensions = {'.mp3', '.mp4', '.md5', '.json', '.pdf'} # excluded_folders = {'DOCUMENTAZIONE_FOTOGRAFICA/', 'TEST-FOLDER-DEV/', 'FILE/'} - excluded_folders = {'DOCUMENTAZIONE_FOTOGRAFICA/', 'TEST-FOLDER-DEV/'} + # excluded_folders = {'DOCUMENTAZIONE_FOTOGRAFICA/', 'TEST-FOLDER-DEV/', 'TST/', 'FILE/', 'DVD/', 'UMT/'} + excluded_folders = {'DOCUMENTAZIONE_FOTOGRAFICA/', 'TEST-FOLDER-DEV/', 'TST/', 'UMT/'} + # excluded_folders = {'DOCUMENTAZIONE_FOTOGRAFICA/', 'TEST-FOLDER-DEV/', 'TST/',} # included_folders = {'FILE/'} # uncomment this to NOT use excluded folders # included_folders = {'TEST-FOLDER-DEV/'} # uncomment this to NOT use excluded folders @@ -98,12 +133,36 @@ def main_process(aws_config, db_config, ach_config, bucket_name, ach_variables): # s3_file_names contains the object keys (strings), not dicts. base_name = os.path.basename(s3file) logging.info(f"S3 Base name: {base_name}") + + # extract folder prefix and media type from inventory code + folder_prefix = os.path.dirname(s3file).rstrip('/') + media_type_in_code = base_name[3:6] if len(base_name) >= 6 else None + + # Generic sanity check: prefix (folder name) should equal the media type in the code + is_valid_prefix = (folder_prefix == media_type_in_code) + + # Special folder allowance rules + folder_allowances = { + 'DVD': ['DVD', 'BRD'], + 'FILE': ['M4V', 'AVI', 'MOV', 'MP4', 'MXF', 'AIF', 'WMV', 'M4A', 'MPG'], + } + + if folder_prefix in folder_allowances: + if media_type_in_code in folder_allowances[folder_prefix]: + is_valid_prefix = True + + if folder_prefix and media_type_in_code and not is_valid_prefix: + logging.warning(f"Prefix mismatch for {s3file}: Folder '{folder_prefix}' does not match code type '{media_type_in_code}'") + # we only warning here but still proceed with standard validation + if validate_inventory_code(base_name): # truncated to first 12 char in the function - logging.info(f"File {base_name} matches pattern.") - # if valid check extension too - if not validate_icode_extension(s3file): - logging.warning(f"File {s3file} has invalid extension for its inventory code.") - continue # skip adding this file to validated contents + logging.info(f"File {base_name} matches pattern.") + # only check inventory code extension for media files (.mp4, .mp3) + # sidecars (.json, .pdf, .md5) only need their base validated + if s3file.lower().endswith(('.mp4', '.mp3')): + if not validate_icode_extension(s3file): + logging.warning(f"File {s3file} has invalid extension for its inventory code.") + continue # skip adding this file to validated contents s3_validated_contents.append(s3file) else: # Check base name in case of error @@ -112,6 +171,15 @@ def main_process(aws_config, db_config, ach_config, bucket_name, ach_variables): folder_name = os.path.dirname(s3file) logging.warning(f"File {s3file} in folder {folder_name} does not match pattern.") + # --------------------------------------------------------------------- + # PHASE 2: DATABASE CROSS-REFERENCE + FILTERING + # + # 1) Fetch existing filenames from the database. + # 2) Skip files already represented in the DB (including sidecar records). + # 3) Produce the final list of S3 object keys that should be parsed/inserted. + # --------------------------------------------------------------------- + logging.info("PHASE 2: Database cross-reference + filtering") + # filter_s3_files_not_in_db # --- Get all DB filenames in one call --- db_file_names = get_distinct_filenames_from_db() @@ -138,6 +206,14 @@ def main_process(aws_config, db_config, ach_config, bucket_name, ach_variables): total_files = len(filtered_file_names) logging.info(f"Total number of the valid (mp3,mp4,md5,json,pdf) files after DB filter: {total_files}") + # Log the files that need to be updated (those not yet in DB) + if total_files > 0: + logging.info("List of files to be updated in the database:") + for f in filtered_file_names: + logging.info(f" - {f}") + else: + logging.info("No new files found to update in the database.") + # Count files with .mp4 and .mp3 extensions mp4_count = sum(1 for file in s3_file_names if file.endswith('.mp4')) mp3_count = sum(1 for file in s3_file_names if file.endswith('.mp3')) @@ -167,6 +243,10 @@ def main_process(aws_config, db_config, ach_config, bucket_name, ach_variables): for file in s3_file_names: if file.endswith('.mp4'): validate_mp4_file(file) # validation_utils.py - check also _H264 at the end + elif file.endswith('.mp3'): + validate_mp3_file(file) # validation_utils.py + # Count by CODE media type (e.g. OA4, MCC) and log the counts for each type + # If ACH_SAFE_RUN is 'false' we enforce strict mp4/pdf parity and abort # when mismatched. Default is 'true' which skips this abort to allow @@ -177,7 +257,7 @@ def main_process(aws_config, db_config, ach_config, bucket_name, ach_variables): logging.error("Number of .mp4 files is not equal to number of .pdf files") # MOD 20251103 # add a check to find the missing pdf or mp4 files and report them - # use file_names to find missing files + # use filtered_file_names to find missing files # store tuples (source_file, expected_counterpart) for clearer logging missing_pdfs = [] # list of (mp4_file, expected_pdf) missing_mp4s = [] # list of (pdf_file, expected_mp4) @@ -187,7 +267,7 @@ def main_process(aws_config, db_config, ach_config, bucket_name, ach_variables): base_name = os.path.splitext(file)[0] # if the mp4 is an H264 variant (e.g. name_H264.mp4) remove the suffix if base_name.endswith('_H264'): - # must check if has extra number for DBT and DVD amd [FILE] + # must check if has extra number for DBT and DVD and [FILE] base_name = base_name[:-5] expected_pdf = base_name + '.pdf' if expected_pdf not in filtered_file_names: @@ -220,14 +300,10 @@ def main_process(aws_config, db_config, ach_config, bucket_name, ach_variables): if mp3_count + mp4_count != json_count: logging.error("Number of .mp3 files + number of .mp4 files is not equal to number of .json files") - - # add check of mp3 +6 mp4 vs json and md5 file like above for mp4 and pdf logging.error("Abort Import Process due to missing files") - - # search wich file dont match TODO raise ValueError("Inconsistent file counts mp3+mp4 vs json") @@ -237,11 +313,20 @@ def main_process(aws_config, db_config, ach_config, bucket_name, ach_variables): # search wich file dont match TODO raise ValueError("Inconsistent file counts mp3+mp4 vs md5") + # --------------------------------------------------------------------- + # PHASE 3: PARSE & INSERT INTO DATABASE + # + # 1) Process each remaining S3 object and validate its associated metadata. + # 2) Insert new records into the database (unless running in DRY_RUN). + # 3) Report counts of successful uploads, warnings, and errors. + # --------------------------------------------------------------------- + logging.info("PHASE 3: Parse S3 objects and insert new records into the database") + # Try to parse S3 files try: # If DRY RUN is set to True, the files will not be uploaded to the database if os.getenv('ACH_DRY_RUN', 'true') == 'false': - uploaded_files_count, warning_files_count, error_files_count = parse_s3_files(s3_client, file_names, ach_variables, excluded_folders) + uploaded_files_count, warning_files_count, error_files_count = parse_s3_files(s3_client, filtered_file_names, ach_variables, excluded_folders) else: logging.warning("DRY RUN is set to TRUE - No files will be added to the database") # set the tuples to zero @@ -258,33 +343,13 @@ def main_process(aws_config, db_config, ach_config, bucket_name, ach_variables): # connect to database conn = psycopg2.connect(**db_config) cur = conn.cursor() - # function count_files that are wav and mov in db - # Map file extensions (include leading dot) to mime types - EXTENSION_MIME_MAP = { - '.avi': 'video/x-msvideo', - '.mov': 'video/mov', - '.wav': 'audio/wav', - '.mp4': 'video/mp4', - '.m4v': 'video/mp4', - '.mp3': 'audio/mp3', - '.mxf': 'application/mxf', - '.mpg': 'video/mpeg', - } + + # Use centralized mime types from config + from config import EXTENSION_MIME_MAP, MIME_TYPES - # populate mime_type list with all relevant MediaInfo/MIME values - mime_type = [ - 'video/x-msvideo', # .avi - 'video/mov', # .mov - 'audio/wav', # .wav - 'video/mp4', # .mp4, .m4v - 'audio/mp3', # .mp3 - 'application/mxf', # .mxf - 'video/mpeg', # .mpg - ] + logging.info(f"Mime types for counting files: {MIME_TYPES}") - logging.info(f"Mime types for counting files: {mime_type}") - - all_files_on_db = count_files(cur, mime_type,'*', False) + all_files_on_db = count_files(cur, MIME_TYPES,'*', False) mov_files_on_db = count_files(cur,['video/mov'],'.mov', False ) mxf_files_on_db = count_files(cur,['application/mxf'],'.mxf', False ) mpg_files_on_db = count_files(cur,['video/mpeg'],'.mpg', False ) @@ -300,18 +365,18 @@ def main_process(aws_config, db_config, ach_config, bucket_name, ach_variables): logging.info(f"Number of .mp4 files in the database: {mp4_files_on_db} and S3: {mp4_count}") # compare the mp4 name and s3 name and report the missing files in the 2 lists a print the list - missing_mp4s = [f for f in file_names if f.endswith('.mp4') and f not in db_file_names] + missing_mp4s = [f for f in s3_file_names if f.endswith('.mp4') and f not in db_file_names] # if missing_mp4s empty do not return a warning if missing_mp4s: - logging.warning(f"Missing .mp4 files in DB compared to S3: {missing_mp4s}") + logging.warning(f"Missing {len(missing_mp4s)} .mp4 files in DB compared to S3: {missing_mp4s}") logging.info(f"Number of .wav files in the database: {wav_files_on_db} ") logging.info(f"Number of .mp3 files in the database: {mp3_files_on_db} and S3: {mp3_count}") - missing_mp3s = [f for f in file_names if f.endswith('.mp3') and f not in db_file_names] + missing_mp3s = [f for f in s3_file_names if f.endswith('.mp3') and f not in db_file_names] # if missing_mp3s empty do not return a warning if missing_mp3s: - logging.warning(f"Missing .mp3 files in DB compared to S3: {missing_mp3s}") + logging.warning(f"Missing {len(missing_mp3s)} .mp3 files in DB compared to S3: {missing_mp3s}") logging.info(f"Number of .avi files in the database: {avi_files_on_db} ") logging.info(f"Number of .m4v files in the database: {m4v_files_on_db} ") diff --git a/s3_utils.py b/s3_utils.py index b9c2c42..88ae08f 100644 --- a/s3_utils.py +++ b/s3_utils.py @@ -31,10 +31,16 @@ def parse_s3_files( s3, s3_files, ach_variables, excluded_folders=[]): # logg ach_variables logging.info(f"ach_variables: {ach_variables}") - logging.info(f"Starting to parse S3 files from bucket {bucket_name}...") + # --------------------------------------------------------------------- + # PHASE 3: PARSE & INSERT INTO DATABASE (DETAILS) + # + # 3.1) Filter out excluded prefixes and keep only files we care about. + # 3.2) Validate each media file alongside its related sidecars (.json, .md5, .pdf). + # 3.3) Cross-check the inventory code in the database and insert new records. + # --------------------------------------------------------------------- + logging.info("PHASE 3: Parse & insert - starting detailed file processing") try: - logging.info(f"Starting to parse S3 files from bucket {bucket_name}...") # Ensure db_config is not None if db_config is None: raise ValueError("Database configuration is not loaded") @@ -66,7 +72,10 @@ def parse_s3_files( s3, s3_files, ach_variables, excluded_folders=[]): # Check the result and proceed accordingly if result: # logging.warning(f"File {file} already exists in the database.") - warning_files_count +=1 + warning_files_count += 1 + if os.getenv('ACH_SAFE_RUN', 'true').lower() == 'true': + logging.error("ACH_SAFE_RUN=true: aborting Phase 3 due to warnings (file already exists in DB): %s", file) + raise ValueError("ACH_SAFE_RUN=true: aborting due to warnings in Phase 3") continue ach_variables['file_fullpath'] = file # is the Object key @@ -76,12 +85,13 @@ def parse_s3_files( s3, s3_files, ach_variables, excluded_folders=[]): ach_variables['objectKeys']['media'] = file ach_variables['objectKeys']['pdf'] = f"{os.path.splitext(file)[0]}.pdf" ach_variables['objectKeys']['pdf'] = ach_variables['objectKeys']['pdf'].replace('_H264', '') + from config import EXTENSION_MIME_MAP if file.endswith('.mp4'): ach_variables['objectKeys']['conservative_copy'] = f"{os.path.splitext(file)[0]}.mov" # remove _H264 is done later elif file.endswith('.mp3'): ach_variables['objectKeys']['conservative_copy'] = f"{os.path.splitext(file)[0]}.wav" else: - logging.KeyError(f"Unsupported file type: {file}") + logging.error(f"Unsupported file type: {file}") error_files_count +=1 continue @@ -115,7 +125,10 @@ def parse_s3_files( s3, s3_files, ach_variables, excluded_folders=[]): logging.info(f"The media file disk size is: {ach_variables['media_disk_size']}") else: logging.warning("Could not retrieve file size for %s.", file) - warning_files_count +=1 + warning_files_count += 1 + if os.getenv('ACH_SAFE_RUN', 'true').lower() == 'true': + logging.error("ACH_SAFE_RUN=true: aborting Phase 3 due to warnings (missing file size): %s", file) + raise ValueError("ACH_SAFE_RUN=true: aborting due to warnings in Phase 3") continue # Skip to the next file in the loop logging.info("Start Validating files for %s...", base_name) diff --git a/utils.py b/utils.py index ce52927..72c782a 100644 --- a/utils.py +++ b/utils.py @@ -24,8 +24,10 @@ def check_video_info(media_info): # If the parent directory is 'FILE' accept multiple container types if parent_dir.lower() == 'file': # Accept .mov, .avi, .m4v, .mp4, .mxf, .mpg (case-insensitive) - if not any(file_name.lower().endswith(ext) for ext in ('.mov', '.avi', '.m4v', '.mp4', '.mxf', '.mpg', '.mpeg')): - return False, "The file is not a .mov, .avi, .m4v, .mp4, .mxf, .mpg or .mpeg file." + # video alowed extension + video_allowed_extensions = ['.mov', '.avi', '.m4v', '.mp4', '.mxf', '.mpg', '.mpeg', '.wmv'] + if not any(file_name.lower().endswith(ext) for ext in video_allowed_extensions): + return False, "The file is not a .mov, .avi, .m4v, .mp4, .mxf, .mpg, .mpeg or .wmv file." # Map file extensions to lists of acceptable general formats (video) general_formats = { @@ -58,18 +60,23 @@ def check_video_info(media_info): # Outside FILE/ directory require .mov specifically if not file_name.lower().endswith('.mov'): return False, "The file is not a .mov file." - # Check if track 1's format is ProRes + + # Strict master MOV rule: track[1] must be ProRes tracks = media_info.get('media', {}).get('track', []) - if len(tracks) > 1: - track_1 = tracks[1] # Assuming track 1 is the second element (index 1) - logging.info(f"Track 1: {track_1}") - if track_1.get('@type', '') == 'Video' and track_1.get('Format', '') == 'ProRes' and track_1.get('Format_Profile', '') == '4444': - return True, "The file is a .mov file with ProRes format in track 1." - else: - return False, "Track 1 format is not ProRes." - else: + if len(tracks) <= 1: return False, "No track 1 found." + track_1 = tracks[1] # track[1] should represent the video stream + logging.info(f"Track 1: {track_1}") + if track_1.get('@type', '') != 'Video': + return False, "Track 1 is not a video track." + if track_1.get('Format', '') != 'ProRes': + return False, "Track 1 format is not ProRes." + if track_1.get('Format_Profile', '') != '4444': + return False, "Track 1 format profile is not 4444." + + return True, "The file is a .mov master with ProRes track 1." + return True, "The file passed the video format checks." except Exception as e: return False, f"Error processing the content: {e}" @@ -77,10 +84,32 @@ def check_video_info(media_info): # result, message = check_audio_info(json_content) def check_audio_info(media_info): try: - # Check if the file name ends with .wav + # Determine source filename (from JSON) and its parent folder file_name = media_info.get('media', {}).get('@ref', '') - if not file_name.endswith('.wav'): - logging.info(f"File name in JSON: {file_name}") + parent_dir = os.path.basename(os.path.dirname(file_name)) + + # If the file lives under FILE/, allow MP3/WAV/M4A/AIF as valid audio containers + if parent_dir.lower() == 'file': + audio_allowed_extensions = ['.wav', '.mp3', '.m4a', '.aif'] + if not any(file_name.lower().endswith(ext) for ext in audio_allowed_extensions): + return False, f"The file is not one of the allowed audio containers: {', '.join(audio_allowed_extensions)}." + + # For WAV, do the strict Wave/PCM validation + if file_name.lower().endswith('.wav'): + tracks = media_info.get('media', {}).get('track', []) + if len(tracks) > 1: + track_1 = tracks[1] + if track_1.get('@type', '') == 'Audio' and track_1.get('Format', '') == 'PCM' and track_1.get('SamplingRate', '') == '96000' and track_1.get('BitDepth', '') == '24': + return True, "The file is a .wav file with Wave format in track 1." + else: + return False, f"Track 1 format is not Wave. Format: {track_1.get('Format', '')}, SamplingRate: {track_1.get('SamplingRate', '')}, BitDepth: {track_1.get('BitDepth', '')}" + return False, "No track 1 found." + + # For MP3/M4A we accept it without strict Wave validation + return True, "The file is an accepted audio container under FILE/ (mp3/m4a/wav)." + + # Outside FILE/ directory require .wav specifically + if not file_name.lower().endswith('.wav'): return False, "The file is not a .wav file." # Check if track 1's format is Wave @@ -91,8 +120,8 @@ def check_audio_info(media_info): if track_1.get('@type', '') == 'Audio' and track_1.get('Format', '') == 'PCM' and track_1.get('SamplingRate', '') == '96000' and track_1.get('BitDepth', '') == '24': return True, "The file is a .wav file with Wave format in track 1." else: - return False, f"Track 1 format is not Wave. Format: {track_1.get('Format', '')}, SamplingRate: {track_1.get('SamplingRate', '')}, BitDepth: {track_1.get('BitDepth', '')}" - + return False, f"Track 1 format is not Wave. Format: {track_1.get('Format', '')}, SamplingRate: {track_1.get('SamplingRate', '')}, BitDepth: {track_1.get('BitDepth', '')}" + return False, "No track 1 found." except Exception as e: diff --git a/validation_utils.py b/validation_utils.py index 0b5e98d..ef3eec7 100644 --- a/validation_utils.py +++ b/validation_utils.py @@ -110,17 +110,39 @@ def validate_icode_extension(file_inventory_code): 'BTC': r'_\d{4}', 'OA4': r'_\d{2}', 'DVD': r'_\d{2}', - 'MCC': r'_[AB]' + 'BRD': r'_\d{2}', + 'MCC': r'_[AB]', + 'DBT': r'_\d{4}', + 'M4V': r'_\d{2}', + 'AVI': r'_\d{2}', + 'MOV': r'_\d{2}', + 'MP4': r'_\d{2}', + 'MXF': r'_\d{2}', + 'MPG': r'_\d{2}' } if not isinstance(file_inventory_code, str) or file_inventory_code == '': logging.warning("Empty or non-string inventory code provided to validate_icode_extension") return False + # security: ignore any folder prefix or path components that might be + # passed in from external sources. + file_inventory_code = os.path.basename(file_inventory_code) + + # remove common file extensions and only remove _H264 if it's an .mp4 file + if file_inventory_code.lower().endswith('.mp4'): + file_inventory_code = file_inventory_code.replace("_H264", "") + + file_inventory_code = os.path.splitext(file_inventory_code)[0] + # Enforce maximum length (12 base + up to 5 extension chars) if len(file_inventory_code) > 17: logging.warning("Inventory code '%s' exceeds maximum allowed length (17).", file_inventory_code) - raise ValueError("Inventory code with extension exceeds maximum length of 17 characters.") + # Only raise the error if DRY RUN is false; otherwise, just log it as a warning + if os.getenv('ACH_DRY_RUN', 'true').lower() == 'false': + raise ValueError("Inventory code with extension exceeds maximum length of 17 characters.") + else: + return False # Validate base first (first 12 chars). If base invalid -> reject. base = file_inventory_code[:12] @@ -132,6 +154,22 @@ def validate_icode_extension(file_inventory_code): support_type = base[3:6] extension = file_inventory_code[12:] + if extension == '': + logging.info("Extension for '%s' is empty (valid).", file_inventory_code) + return True + + expected_ext_pattern = file_type_to_regex.get(support_type) + if expected_ext_pattern is None: + logging.warning("Unsupported type '%s' for extension validation in '%s'.", support_type, file_inventory_code) + return False + + if not re.fullmatch(expected_ext_pattern, extension): + logging.warning("Extension '%s' does not match expected pattern '%s' for type '%s'.", extension, expected_ext_pattern, support_type) + return False + + logging.info("Inventory code with extension '%s' is valid.", file_inventory_code) + return True + def analyze_extension_pattern(file_inventory_code): """Analyze extension and base-length issues for a full inventory code. @@ -338,11 +376,13 @@ if __name__ == "__main__": # validate_icode_extension (valid and invalid) {"name": "validate_icode_extension no ext", "fn": lambda: validate_icode_extension("VO-DVD-12345"), "expect": True}, + {"name": "validate_icode_extension BARE prefix", "fn": lambda: validate_icode_extension("BTC/VO-DVD-12345"), "expect": True}, {"name": "validate_icode_extension BTC valid", "fn": lambda: validate_icode_extension("VO-BTC-12345_1234"), "expect": True}, {"name": "validate_icode_extension DVD valid", "fn": lambda: validate_icode_extension("VO-DVD-12345_12"), "expect": True}, {"name": "validate_icode_extension MCC valid", "fn": lambda: validate_icode_extension("VO-MCC-12345_A"), "expect": True}, {"name": "validate_icode_extension unsupported type", "fn": lambda: validate_icode_extension("VO-XYZ-12345_12"), "expect": False}, {"name": "validate_icode_extension too long extension (raises)", "fn": lambda: validate_icode_extension("VO-DVD-12345_12345"), "expect_exception": ValueError}, + {"name": "validate_icode_extension prefix too long (raises)", "fn": lambda: validate_icode_extension("XYZ/VO-DVD-12345_12345"), "expect_exception": ValueError}, {"name": "validate_icode_extension BTC invalid pattern", "fn": lambda: validate_icode_extension("VO-BTC-12345_12"), "expect": False}, # mp4/mp3 validators - return lists