ACH-ARKIVO-ImportMedia/main.py

527 lines
26 KiB
Python

# v20251103 - Main script to import media files from S3 to the database
import logging
import time
from datetime import datetime
import pytz
import os
import re
from logging_config import setup_logging, CUSTOM_ERROR_LEVEL
from email_utils import handle_error, send_email_with_attachment
from s3_utils import create_s3_client, list_s3_bucket, parse_s3_files
from error_handler import handle_general_error, handle_file_not_found_error, handle_value_error
from file_utils import is_file_empty
from db_utils import count_files, get_distinct_filenames_from_db
from dotenv import load_dotenv
from validation_utils import validate_inventory_code, analyze_pattern_match, validate_icode_extension, list_s3_not_in_db, validate_mp4_file, validate_mp3_file
import config
import psycopg2
load_dotenv()
import re
import logging
import os
# MAIN PROCESS
def main_process(aws_config, db_config, ach_config, bucket_name, ach_variables):
# import global variables
#from config import load_config, aws_config, db_config, ach_config, bucket_name
#global aws_config, db_config, ach_config, bucket_name
#config import load_config , aws_config, db_config, ach_config, bucket_name
#load_config()
logging.info(f"bucket_name: {bucket_name}")
# SECURITY CHECK: If DRY_RUN is false and ENV is development, ask for confirmation
dry_run_env = os.getenv('ACH_DRY_RUN', 'true').lower()
ach_env = os.getenv('ACH_ENV', 'development').lower()
if dry_run_env == 'false' and ach_env == 'development':
print("\n" + "!"*60)
print("!!! SECURITY CHECK: RUNNING IMPORT ON DEVELOPMENT ENVIRONMENT !!!")
print(f"DB_HOST: {db_config.get('host')}")
print(f"DB_NAME: {db_config.get('database')}")
print(f"DB_USER: {db_config.get('user')}")
print(f"DB_PORT: {db_config.get('port')}")
print("!"*60 + "\n")
user_input = input(f"Please type the DB_NAME '{db_config.get('database')}' to proceed: ")
if user_input != db_config.get('database'):
print("Action aborted by user. Database name did not match.")
logging.error("Process aborted: User failed to confirm DB_NAME for development import.")
return
# Ensure timing variables are always defined so later error-email logic
# won't fail if an exception is raised before end_time/elapsed_time is set.
start_time = time.time()
# IN HUMAN READABLE FORMAT
logging.info(f"Process started at {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))}")
end_time = start_time
elapsed_time = 0.0
try:
logging.info("Starting the main process...")
# ---------------------------------------------------------------------
# PHASE 1: S3 OBJECT DISCOVERY + INITIAL VALIDATION
#
# 1) List objects in the configured S3 bucket.
# 2) Filter objects by allowed extensions and excluded folders.
# 3) Validate the inventory code format (e.g. VA-C01-12345) and ensure the
# folder prefix matches the code type (e.g. "BRD" folder for BRD code).
# 4) Reject files that violate naming conventions before any DB interaction.
#
# This phase is intentionally descriptive so the workflow can be understood
# from logs even if the function names are not immediately clear.
# ---------------------------------------------------------------------
logging.info("PHASE 1: S3 object discovery + initial validation")
# Helper to make spaces visible in filenames for logging (replace ' ' with open-box char)
def _visible_spaces(name: str) -> str:
try:
return name.replace(' ', '\u2423')
except Exception:
return name
# Create the S3 client
s3_client = create_s3_client(aws_config)
# List S3 bucket s3_validated_contents
list_s3_files = list_s3_bucket(s3_client, bucket_name)
# Define valid extensions and excluded folders
valid_extensions = {'.mp3', '.mp4', '.md5', '.json', '.pdf'}
# excluded_folders = {'DOCUMENTAZIONE_FOTOGRAFICA/', 'TEST-FOLDER-DEV/', 'FILE/'}
# excluded_folders = {'DOCUMENTAZIONE_FOTOGRAFICA/', 'TEST-FOLDER-DEV/', 'TST/', 'FILE/', 'DVD/', 'UMT/'}
excluded_folders = {'DOCUMENTAZIONE_FOTOGRAFICA/', 'TEST-FOLDER-DEV/', 'TST/', 'UMT/'}
# excluded_folders = {'DOCUMENTAZIONE_FOTOGRAFICA/', 'TEST-FOLDER-DEV/', 'TST/',}
# included_folders = {'FILE/'} # uncomment this to NOT use excluded folders
# included_folders = {'TEST-FOLDER-DEV/'} # uncomment this to NOT use excluded folders
# Extract and filter file names
# s3_file_names: include only files that match valid extensions and
# (if configured) whose key starts with one of the included_folders.
# We still skip any explicitly excluded_folders. Guard against the
# case where `included_folders` isn't defined to avoid NameError.
try:
use_included = bool(included_folders)
except NameError:
use_included = False
if use_included:
s3_file_names = [
content['Key'] for content in list_s3_files
if any(content['Key'].endswith(ext) for ext in valid_extensions)
and any(content['Key'].startswith(folder) for folder in included_folders)
]
logging.info(f"Using included_folders filter: {included_folders}")
else:
s3_file_names = [
content['Key'] for content in list_s3_files
if any(content['Key'].endswith(ext) for ext in valid_extensions)
and not any(content['Key'].startswith(folder) for folder in excluded_folders)
]
logging.info("Using excluded_folders filter")
# check inventory code syntax
# first check s3_file_names if the file base name and folder name match pattern = r'^[VA][OC]-[A-Z0-9]{3}-\d{5}_\d{2}$'
s3_validated_contents = []
for s3file in s3_file_names:
# s3_file_names contains the object keys (strings), not dicts.
base_name = os.path.basename(s3file)
logging.info(f"S3 Base name: {base_name}")
# extract folder prefix and media type from inventory code
folder_prefix = os.path.dirname(s3file).rstrip('/')
media_type_in_code = base_name[3:6] if len(base_name) >= 6 else None
# Generic sanity check: prefix (folder name) should equal the media type in the code
is_valid_prefix = (folder_prefix == media_type_in_code)
# Special folder allowance rules
folder_allowances = {
'DVD': ['DVD', 'BRD'],
'FILE': ['M4V', 'AVI', 'MOV', 'MP4', 'MXF', 'AIF', 'WMV', 'M4A', 'MPG'],
}
if folder_prefix in folder_allowances:
if media_type_in_code in folder_allowances[folder_prefix]:
is_valid_prefix = True
if folder_prefix and media_type_in_code and not is_valid_prefix:
logging.warning(f"Prefix mismatch for {s3file}: Folder '{folder_prefix}' does not match code type '{media_type_in_code}'")
# we only warning here but still proceed with standard validation
if validate_inventory_code(base_name): # truncated to first 12 char in the function
logging.info(f"File {base_name} matches pattern.")
# only check inventory code extension for media files (.mp4, .mp3)
# sidecars (.json, .pdf, .md5) only need their base validated
if s3file.lower().endswith(('.mp4', '.mp3')):
if not validate_icode_extension(s3file):
logging.warning(f"File {s3file} has invalid extension for its inventory code.")
continue # skip adding this file to validated contents
s3_validated_contents.append(s3file)
else:
# Check base name in case of error
base_issues = analyze_pattern_match(base_name, "Base name")
logging.warning(f"Base name '{base_name}' does not match pattern. Issues: {base_issues}")
folder_name = os.path.dirname(s3file)
logging.warning(f"File {s3file} in folder {folder_name} does not match pattern.")
# ---------------------------------------------------------------------
# PHASE 2: DATABASE CROSS-REFERENCE + FILTERING
#
# 1) Fetch existing filenames from the database.
# 2) Skip files already represented in the DB (including sidecar records).
# 3) Produce the final list of S3 object keys that should be parsed/inserted.
# ---------------------------------------------------------------------
logging.info("PHASE 2: Database cross-reference + filtering")
# filter_s3_files_not_in_db
# --- Get all DB filenames in one call ---
db_file_names = get_distinct_filenames_from_db()
# --- Keep only those not in DB ---
# Additionally, if the DB already contains a sidecar record for the
# same basename (for extensions .md5, .json, .pdf), skip the S3 file
# since the asset is already represented in the DB via those sidecars.
sidecar_exts = ('.md5', '.json', '.pdf')
db_sidecar_basenames = set()
for dbf in db_file_names:
for ext in sidecar_exts:
if dbf.endswith(ext):
db_sidecar_basenames.add(dbf[:-len(ext)])
break
filtered_file_names=list_s3_not_in_db(s3_validated_contents, db_file_names, db_sidecar_basenames)
# Print the total number of files
total_files_s3 = len(s3_validated_contents)
logging.info(f"Total number of the valid (mp3,mp4,md5,json,pdf) files in the S3 bucket before DB filter: {total_files_s3}")
total_files = len(filtered_file_names)
logging.info(f"Total number of the valid (mp3,mp4,md5,json,pdf) files after DB filter: {total_files}")
# Log the files that need to be updated (those not yet in DB)
if total_files > 0:
logging.info("List of files to be updated in the database:")
for f in filtered_file_names:
logging.info(f" - {f}")
else:
logging.info("No new files found to update in the database.")
# Count files with .mp4 and .mp3 extensions
mp4_count = sum(1 for file in s3_file_names if file.endswith('.mp4'))
mp3_count = sum(1 for file in s3_file_names if file.endswith('.mp3'))
md5_count = sum(1 for file in s3_file_names if file.endswith('.md5'))
pdf_count = sum(1 for file in s3_file_names if file.endswith('.pdf'))
json_count = sum(1 for file in s3_file_names if file.endswith('.json'))
mov_count = sum(1 for file in s3_file_names if file.endswith('.mov'))
# jpg_count = sum(1 for file in file_names if file.endswith('.jpg'))
# file directory
avi_count = sum(1 for file in s3_file_names if file.endswith('.avi'))
m4v_count = sum(1 for file in s3_file_names if file.endswith('.m4v'))
# Log the counts
# Get the logger instance
logger = logging.getLogger()
# Use the logger instance to log custom info
logging.info("Number of .mp4 files on S3 bucket (%s): %s", bucket_name, mp4_count)
logging.info("Number of .mp3 files on S3 bucket (%s): %s", bucket_name, mp3_count)
logging.info("Number of .md5 files on S3 bucket (%s): %s", bucket_name, md5_count)
logging.info("Number of .pdf files on S3 bucket (%s): %s", bucket_name, pdf_count)
logging.info("Number of .json files on S3 bucket (%s): %s", bucket_name, json_count)
logging.info("Number of .mov files on S3 bucket (%s): %s", bucket_name, mov_count)
# logging.info(f"Number of .jpg files: {jpg_count}")
# should check all .mp4 should have base_name.endswith('_H264'):
for file in s3_file_names:
if file.endswith('.mp4'):
validate_mp4_file(file) # validation_utils.py - check also _H264 at the end
elif file.endswith('.mp3'):
validate_mp3_file(file) # validation_utils.py
# Count by CODE media type (e.g. OA4, MCC) and log the counts for each type
# If ACH_SAFE_RUN is 'false' we enforce strict mp4/pdf parity and abort
# when mismatched. Default is 'true' which skips this abort to allow
# safer runs during testing or manual reconciliation.
s3_files_filtered= []
if os.getenv('ACH_SAFE_RUN', 'true') == 'true':
if mp4_count != pdf_count:
logging.error("Number of .mp4 files is not equal to number of .pdf files")
# MOD 20251103
# add a check to find the missing pdf or mp4 files and report them
# use filtered_file_names to find missing files
# store tuples (source_file, expected_counterpart) for clearer logging
missing_pdfs = [] # list of (mp4_file, expected_pdf)
missing_mp4s = [] # list of (pdf_file, expected_mp4)
for file in filtered_file_names:
if file.endswith('.mp4'):
# remove extension robustly using os.path.splitext to preserve any path prefix
base_name = os.path.splitext(file)[0]
# if the mp4 is an H264 variant (e.g. name_H264.mp4) remove the suffix
if base_name.endswith('_H264'):
# must check if has extra number for DBT and DVD and [FILE]
base_name = base_name[:-5]
expected_pdf = base_name + '.pdf'
if expected_pdf not in filtered_file_names:
missing_pdfs.append((file, expected_pdf))
elif file.endswith('.pdf'): # check if pdf as no .mp4
# Normalize base name and accept either the regular mp4 or the _H264 variant.
# remove extension robustly using os.path.splitext
base_name = os.path.splitext(file)[0]
expected_mp4 = base_name + '_H264.mp4'
# If neither the regular mp4 nor the H264 variant exists, report missing.
if expected_mp4 not in filtered_file_names:
missing_mp4s.append((file, expected_mp4))
else:
# append to s3_files_filtered
s3_files_filtered.append(file)
continue
# report missing .pdf files
if missing_pdfs:
logging.error("Missing .pdf files (mp4 -> expected pdf):")
for mp4_file, expected_pdf in missing_pdfs:
logging.error("%s -> %s", _visible_spaces(mp4_file), _visible_spaces(expected_pdf))
# report missing .mp4 files
if missing_mp4s:
logging.error("Missing .mp4 files (pdf -> expected mp4):")
for pdf_file, expected_mp4 in missing_mp4s:
logging.error("%s -> %s", _visible_spaces(pdf_file), _visible_spaces(expected_mp4))
logging.error("Abort Import Process due to missing files")
raise ValueError("Inconsistent file counts mp4 vs pdf")
if mp3_count + mp4_count != json_count:
logging.error("Number of .mp3 files + number of .mp4 files is not equal to number of .json files")
# add check of mp3 +6 mp4 vs json and md5 file like above for mp4 and pdf
logging.error("Abort Import Process due to missing files")
# search wich file dont match TODO
raise ValueError("Inconsistent file counts mp3+mp4 vs json")
if mp3_count + mp4_count != md5_count:
logging.error("Number of .mp3 files + number of .mp4 files is not equal to number of .md5 files")
logging.error("Abort Import Process due to missing files")
# search wich file dont match TODO
raise ValueError("Inconsistent file counts mp3+mp4 vs md5")
# ---------------------------------------------------------------------
# PHASE 3: PARSE & INSERT INTO DATABASE
#
# 1) Process each remaining S3 object and validate its associated metadata.
# 2) Insert new records into the database (unless running in DRY_RUN).
# 3) Report counts of successful uploads, warnings, and errors.
# ---------------------------------------------------------------------
logging.info("PHASE 3: Parse S3 objects and insert new records into the database")
# Try to parse S3 files
try:
# If DRY RUN is set to True, the files will not be uploaded to the database
if os.getenv('ACH_DRY_RUN', 'true') == 'false':
uploaded_files_count, warning_files_count, error_files_count = parse_s3_files(s3_client, filtered_file_names, ach_variables, excluded_folders)
else:
logging.warning("DRY RUN is set to TRUE - No files will be added to the database")
# set the tuples to zero
uploaded_files_count, warning_files_count, error_files_count = (0, 0, 0)
logging.info("Total number of files (mp3+mp4) with warnings: %s. (Probably already existing in the DB)", warning_files_count)
logging.info("Total number of files with errors: %s", error_files_count)
logging.info("Total number of files uploaded: %s", uploaded_files_count)
logging.info("All files parsed")
except Exception as e:
logging.error(f"An error occurred while parsing S3 files: {e}")
handle_general_error(e)
# Check results
# connect to database
conn = psycopg2.connect(**db_config)
cur = conn.cursor()
# Use centralized mime types from config
from config import EXTENSION_MIME_MAP, MIME_TYPES
logging.info(f"Mime types for counting files: {MIME_TYPES}")
all_files_on_db = count_files(cur, MIME_TYPES,'*', False)
mov_files_on_db = count_files(cur,['video/mov'],'.mov', False )
mxf_files_on_db = count_files(cur,['application/mxf'],'.mxf', False )
mpg_files_on_db = count_files(cur,['video/mpeg'],'.mpg', False )
avi_files_on_db = count_files(cur,['video/x-msvideo'],'.avi', False )
m4v_files_on_db = count_files(cur,['video/mp4'],'.m4v', False )
mp4_files_on_db = count_files(cur,['video/mp4'],'.mp4', False )
wav_files_on_db = count_files(cur,['audio/wav'],'.wav', False )
mp3_files_on_db = count_files(cur,['audio/mp3'],'.mp3', False )
# mov + m4v + avi + mxf + mpg
logging.info(f"Number of all video files in the database: {all_files_on_db}")
logging.info(f"Number of .mov files in the database: {mov_files_on_db} and S3: {mov_count} ")
logging.info(f"Number of .mp4 files in the database: {mp4_files_on_db} and S3: {mp4_count}")
# compare the mp4 name and s3 name and report the missing files in the 2 lists a print the list
missing_mp4s = [f for f in s3_file_names if f.endswith('.mp4') and f not in db_file_names]
# if missing_mp4s empty do not return a warning
if missing_mp4s:
logging.warning(f"Missing {len(missing_mp4s)} .mp4 files in DB compared to S3: {missing_mp4s}")
logging.info(f"Number of .wav files in the database: {wav_files_on_db} ")
logging.info(f"Number of .mp3 files in the database: {mp3_files_on_db} and S3: {mp3_count}")
missing_mp3s = [f for f in s3_file_names if f.endswith('.mp3') and f not in db_file_names]
# if missing_mp3s empty do not return a warning
if missing_mp3s:
logging.warning(f"Missing {len(missing_mp3s)} .mp3 files in DB compared to S3: {missing_mp3s}")
logging.info(f"Number of .avi files in the database: {avi_files_on_db} ")
logging.info(f"Number of .m4v files in the database: {m4v_files_on_db} ")
logging.info(f"Number of .mxf files in the database: {mxf_files_on_db} ")
logging.info(f"Number of .mpg files in the database: {mpg_files_on_db} ")
logging.info(f"Total file in s3 before import {total_files}")
# time elapsed
end_time = time.time() # Record end time
elapsed_time = end_time - start_time
logging.info(f"Processing completed. Time taken: {elapsed_time:.2f} seconds")
except FileNotFoundError as e:
handle_file_not_found_error(e)
except ValueError as e:
handle_value_error(e)
except Exception as e:
handle_general_error(e)
# Send Email with logs if success or failure
# Define the CET timezone
cet = pytz.timezone('CET')
# Helper to rename a log file by appending a timestamp and return the new path.
def _rename_log_if_nonempty(path):
try:
if not path or not os.path.exists(path):
return None
# If file is empty, don't attach/rename it
if os.path.getsize(path) == 0:
return None
dir_name = os.path.dirname(path)
base_name = os.path.splitext(os.path.basename(path))[0]
timestamp = datetime.now(cet).strftime("%Y%m%d_%H%M%S")
new_log_path = os.path.join(dir_name, f"{base_name}_{timestamp}.log")
# Attempt to move/replace atomically where possible
try:
os.replace(path, new_log_path)
except Exception:
# Fallback to rename (may raise on Windows if target exists)
os.rename(path, new_log_path)
return new_log_path
except Exception as e:
logging.error("Failed to rename log %s: %s", path, e)
return None
# close logging to flush handlers before moving files
logging.shutdown()
logging.info("Preparing summary email")
error_log = './logs/ACH_media_import_errors.log'
warning_log = './logs/ACH_media_import_warning.log'
# Determine presence of errors/warnings
has_errors = False
has_warnings = False
try:
if os.path.exists(error_log) and os.path.getsize(error_log) > 0:
with open(error_log, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
if 'ERROR' in content or len(content.strip()) > 0:
has_errors = True
if os.path.exists(warning_log) and os.path.getsize(warning_log) > 0:
with open(warning_log, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
if 'WARNING' in content or len(content.strip()) > 0:
has_warnings = True
except Exception as e:
logging.error("Error while reading log files: %s", e)
# from env - split safely and strip whitespace
def _split_env_list(name):
raw = os.getenv(name, '')
return [s.strip() for s in raw.split(',') if s.strip()]
EMAIL_RECIPIENTS = _split_env_list('EMAIL_RECIPIENTS')
ERROR_EMAIL_RECIPIENTS = _split_env_list('ERROR_EMAIL_RECIPIENTS') or EMAIL_RECIPIENTS
SUCCESS_EMAIL_RECIPIENTS = _split_env_list('SUCCESS_EMAIL_RECIPIENTS') or EMAIL_RECIPIENTS
# Choose subject and attachment based on severity
if has_errors:
subject = "ARKIVO Import of Video/Audio Ran with Errors"
attachment_to_send = _rename_log_if_nonempty(error_log) or error_log
body = "Please find the attached error log file. Job started at %s and ended at %s, taking %.2f seconds." % (
datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S'),
datetime.fromtimestamp(end_time).strftime('%Y-%m-%d %H:%M:%S'),
elapsed_time
)
email_recipients=ERROR_EMAIL_RECIPIENTS
elif has_warnings:
subject = "ARKIVO Import of Video/Audio Completed with Warnings"
# Attach the warnings log for investigation
attachment_to_send = _rename_log_if_nonempty(warning_log) or warning_log
body = "The import completed with warnings. Please find the attached warning log. Job started at %s and ended at %s, taking %.2f seconds." % (
datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S'),
datetime.fromtimestamp(end_time).strftime('%Y-%m-%d %H:%M:%S'),
elapsed_time
)
email_recipients=ERROR_EMAIL_RECIPIENTS
else:
subject = "ARKIVO Video/Audio Import Completed Successfully"
# No attachment for clean success
attachment_to_send = None
body = "The import of media (video/audio) completed successfully without any errors or warnings. Job started at %s and ended at %s, taking %.2f seconds." % (
datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S'),
datetime.fromtimestamp(end_time).strftime('%Y-%m-%d %H:%M:%S'),
elapsed_time
)
email_recipients=SUCCESS_EMAIL_RECIPIENTS
logging.info("Sending summary email: %s (attach: %s)", subject, bool(attachment_to_send))
# Send email
try:
send_email_with_attachment(
subject=subject,
body=body,
attachment_path=attachment_to_send,
email_recipients=email_recipients
)
except Exception as e:
logging.error("Failed to send summary email: %s", e)
return
if __name__ == "__main__":
try:
# Setup logging using standard TimedRotatingFileHandler handlers.
# No manual doRollover calls — rely on the handler's built-in rotation.
logger, rotating_handler, error_handler, warning_handler = setup_logging()
# Load configuration settings
aws_config, db_config, ach_config, bucket_name, ach_variables = config.load_config()
logging.info("Config loaded, logging setup done")
# Run the main process
main_process(aws_config, db_config, ach_config, bucket_name, ach_variables)
logging.info("Main process completed at: %s", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
except Exception as e:
logging.error(f"An error occurred: {e}")