716 lines
32 KiB
Python
716 lines
32 KiB
Python
""" db utils.py
|
|
Description:
|
|
This module provides utility functions for interacting with a PostgreSQL database using psycopg2.
|
|
It includes functions for checking inventory codes, adding file records and relationships,
|
|
retrieving support IDs, and executing queries.
|
|
Functions:
|
|
check_inventory_in_db(s3_client, cur, base_name):
|
|
Checks if the inventory code exists in the database and validates its format.
|
|
check_objkey_in_file_db(cur, base_name):
|
|
Checks if the object key exists in the database.
|
|
add_file_record_and_relationship(s3_client, cur, base_name, ach_variables):
|
|
Adds a file record and its relationships to the database and uploads a log file to S3.
|
|
add_file_record(cur, editor_id, approver_id, disk_size, file_availability_dict, notes, base_name, extension, storage_location, file_type, custom_data_in=None):
|
|
Adds a file record to the database.
|
|
add_file_support_relationship(cur, editor_id, approver_id, file_id, support_id, status):
|
|
Adds a relationship between a file and a support record in the database.
|
|
add_file_relationship(cur, file_a_id, file_b_id, file_relation_dict, custom_data, status, editor_id, approver_id):
|
|
Adds a relationship between two files in the database.
|
|
retrieve_support_id(cur, inventory_code):
|
|
Retrieves the support ID for a given inventory code from the database.
|
|
retrieve_digital_file_names(s3_client, cur, base_name, digital_file_name_in):
|
|
Retrieves digital file names from the database that match the given base_name.
|
|
get_db_connection(db_config):
|
|
Establishes a connection to the PostgreSQL database using the provided configuration.
|
|
execute_query(conn, query, params=None):
|
|
Executes a SQL query on the database.
|
|
"""
|
|
import psycopg2
|
|
from psycopg2 import sql
|
|
import logging
|
|
from datetime import datetime
|
|
import re
|
|
from error_handler import notify_error
|
|
from utils import check_audio_info
|
|
import json
|
|
import os
|
|
import config
|
|
from config import EXTENSION_MIME_MAP
|
|
|
|
def get_mime_for_extension(extension: str) -> str:
|
|
"""Return the mime type for an extension. Accepts with or without leading dot.
|
|
|
|
Falls back to 'application/octet-stream' when unknown.
|
|
"""
|
|
if not extension:
|
|
return 'application/octet-stream'
|
|
if not extension.startswith('.'):
|
|
extension = f'.{extension}'
|
|
return EXTENSION_MIME_MAP.get(extension.lower(), 'application/octet-stream')
|
|
|
|
|
|
def get_mime_from_mediainfo(ach_variables: dict) -> str:
|
|
"""Determine a MIME type from the JSON sidecar mediainfo.
|
|
|
|
This is used to capture the *master* format (conservatory copy) even when
|
|
the stream copy on S3 is a different container (e.g., _H264.mp4).
|
|
|
|
Rules:
|
|
- If the file is outside `FILE/`, the master must be ProRes (MOV/QuickTime).
|
|
If it is not ProRes, this is a fatal error.
|
|
- If the file is under `FILE/`, any format is acceptable; MIME is derived
|
|
from the JSON metadata.
|
|
|
|
If mediainfo is missing or cannot be mapped, fall back to extension-based mapping.
|
|
"""
|
|
|
|
# Determine whether this is a streaming file (FILE/) or a master file.
|
|
file_fullpath = ach_variables.get('file_fullpath', '') or ach_variables.get('objectKeys', {}).get('media', '')
|
|
file_fullpath_norm = file_fullpath.replace('\\', '/').lstrip('/')
|
|
prefix = file_fullpath_norm.split('/', 1)[0].upper() if file_fullpath_norm else ''
|
|
is_file_folder = (prefix == 'FILE')
|
|
|
|
# Extract MediaInfo tracks
|
|
mediainfo = ach_variables.get('custom_data_in', {}).get('mediainfo', {})
|
|
tracks = mediainfo.get('media', {}).get('track', [])
|
|
|
|
# ---- Master (outside FILE/) ----
|
|
if not is_file_folder:
|
|
# Determine if this is an audio-type inventory code (OA4/MCC/DAT) or video master.
|
|
inventory_code = ach_variables.get('inventory_code', '') or ''
|
|
inventory_type = inventory_code[3:6] if len(inventory_code) >= 6 else ''
|
|
audio_inventory_types = {'OA4', 'MCC', 'DAT'}
|
|
|
|
if inventory_type in audio_inventory_types:
|
|
# For audio masters we validate the audio metadata (not ProRes video tracks).
|
|
result, message = check_audio_info(mediainfo)
|
|
if not result:
|
|
raise ValueError(f"Audio validation failed: {message}")
|
|
|
|
# Derive MIME from extension; fall back to the configured mapping.
|
|
extension = os.path.splitext(file_fullpath_norm)[1].lower()
|
|
return get_mime_for_extension(extension or ach_variables.get('extension'))
|
|
|
|
# Otherwise, enforce a ProRes video track for video masters.
|
|
# Find the video track
|
|
video_track = None
|
|
for t in tracks:
|
|
if t.get('@type', '').lower() == 'video':
|
|
video_track = t
|
|
break
|
|
|
|
if not video_track:
|
|
raise ValueError("Master file missing a Video track in mediainfo; cannot validate ProRes")
|
|
|
|
format_value = (video_track.get('Format') or '').strip()
|
|
if 'prores' not in format_value.lower():
|
|
raise ValueError(f"Master file is not ProRes (Format='{format_value}').")
|
|
|
|
# Strict rule: master is always QuickTime/ProRes
|
|
return 'video/quicktime'
|
|
|
|
# ---- FILE/ folder: do not enforce ProRes; derive MIME from JSON (or extension fallback) ----
|
|
try:
|
|
for track in tracks:
|
|
if track.get('@type', '') == 'General':
|
|
format_value = track.get('Format', '')
|
|
if format_value:
|
|
mapping = {
|
|
'AVI': 'video/x-msvideo',
|
|
'MOV': 'video/quicktime',
|
|
'QuickTime': 'video/quicktime',
|
|
'MPEG-4': 'video/mp4',
|
|
'MP4': 'video/mp4',
|
|
'MXF': 'application/mxf',
|
|
'MPEG': 'video/mpeg',
|
|
'MPEG-PS': 'video/mpeg',
|
|
'MPEG-TS': 'video/MP2T',
|
|
'MPEG Audio': 'audio/mpeg',
|
|
'MPEG Audio/Layer 3': 'audio/mpeg',
|
|
'AAC': 'audio/aac',
|
|
'PCM': 'audio/wav',
|
|
'WAV': 'audio/wav',
|
|
'AIFF': 'audio/aiff',
|
|
'FLAC': 'audio/flac',
|
|
}
|
|
for k, v in mapping.items():
|
|
if format_value.lower() == k.lower():
|
|
return v
|
|
if 'avi' in format_value.lower():
|
|
return 'video/x-msvideo'
|
|
if 'mp4' in format_value.lower():
|
|
return 'video/mp4'
|
|
if 'mpeg' in format_value.lower():
|
|
return 'video/mpeg'
|
|
if 'wav' in format_value.lower() or 'pcm' in format_value.lower():
|
|
return 'audio/wav'
|
|
if 'mp3' in format_value.lower():
|
|
return 'audio/mpeg'
|
|
# Fall back to extension-based mapping when metadata doesn't yield a mime
|
|
extension = ach_variables.get('extension')
|
|
return get_mime_for_extension(extension)
|
|
except Exception:
|
|
return get_mime_for_extension(ach_variables.get('extension'))
|
|
|
|
def get_distinct_filenames_from_db():
|
|
"""Retrieve distinct digital file names from the Postgres DB.
|
|
|
|
This helper loads DB configuration via `config.load_config()` and then
|
|
opens a connection with `get_db_connection(db_config)`.
|
|
"""
|
|
# load db_config from project config helper (aws_config, db_config, ach_config, bucket_name, ach_variables)
|
|
try:
|
|
_, db_config, _, _, _ = config.load_config()
|
|
except Exception:
|
|
# If config.load_config isn't available or fails, re-raise with a clearer message
|
|
raise RuntimeError("Unable to load DB configuration via config.load_config()")
|
|
|
|
conn = get_db_connection(db_config)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
# cur.execute("SELECT DISTINCT digital_file_name FROM file;")
|
|
cur.execute(
|
|
"""SELECT DISTINCT digital_file_name
|
|
FROM file
|
|
WHERE digital_file_name ~ '\\.(mp3|mp4|md5|json|pdf)$';"""
|
|
)
|
|
rows = cur.fetchall()
|
|
# Flatten list of tuples -> simple set of names
|
|
return {row[0] for row in rows if row[0] is not None}
|
|
finally:
|
|
conn.close()
|
|
|
|
# Function to check if the inventory code exists in the database
|
|
def check_inventory_in_db(s3_client, cur, base_name):
|
|
logging.debug("Executing check_inventory_in_db")
|
|
# Load the configuration from the .env file
|
|
# aws_config, db_config, ach_config, bucket_name, ach_variables = config.load_config()
|
|
|
|
# Define the pattern for the inventory code
|
|
media_tipology_A = ['MCC', 'OA4', 'DAT']
|
|
# FOR FILE add to media_tipology_A for readibily
|
|
media_tipology_A += ['M4A','AIF'] # add for "FILE" folders 04112025
|
|
|
|
# TODO add other tipologies: AVI, M4V, MOV, MP4, MXF, MPG (done 04112025)
|
|
media_tipology_V = [
|
|
'OV1', 'OV2', 'UMT', 'VHS', 'HI8', 'VD8', 'BTC', 'DBT', 'IMX', 'DVD',
|
|
'CDR', 'MDV', 'DVC', 'HDC', 'BRD', 'CDV'
|
|
]
|
|
# FOR FILE add to media_tipology_V for readibily
|
|
media_tipology_V += ['AVI', 'M4V', 'MOV', 'MP4', 'MXF', 'MPG', 'WMV'] # add for "FILE" folders 04112025
|
|
|
|
# Use centralized mime types from config
|
|
from config import MIME_TYPES
|
|
|
|
try:
|
|
logging.info(f"SUPPORT TYPOLOGY : {base_name[3:6]}")
|
|
if base_name[3:6] == 'OA4':
|
|
pattern = r'^[VA][OC]-[A-Z0-9]{3}-\d{5}_\d{2}$' # include the _\d{2} for OA4
|
|
truncated_base_name = base_name[:15]
|
|
logging.info(f"type is OA4: {truncated_base_name}")
|
|
elif base_name[3:6] == 'MCC':
|
|
pattern = r'^[VA][OC]-[A-Z0-9]{3}-\d{5}_[AB]$' # include the _[AB] for MCC
|
|
truncated_base_name = base_name[:14]
|
|
logging.info(f"type is MCC: {truncated_base_name}")
|
|
else:
|
|
# Check the base_name format with regex pattern first
|
|
pattern = r'^[VA][OC]-[A-Z0-9]{3}-\d{5}$'
|
|
truncated_base_name = base_name[:12]
|
|
logging.info(f"type is default: {truncated_base_name}")
|
|
|
|
logging.info(f"Checking inventory code {truncated_base_name} with pattern {pattern}...")
|
|
# Validate the string
|
|
try:
|
|
if not re.match(pattern, truncated_base_name):
|
|
error_message = f"Invalid format for base_name {truncated_base_name}"
|
|
logging.error(error_message)
|
|
raise ValueError(error_message) # Create and raise the exception
|
|
else :
|
|
# Extract the first character and the 3 central characters
|
|
first_char = truncated_base_name[0]
|
|
central_chars = truncated_base_name[3:6]
|
|
|
|
# Check the corresponding list based on the first character
|
|
if first_char == 'A': # Check the corresponding list based on the first character
|
|
if central_chars not in media_tipology_A:
|
|
logging.error(f"Invalid media tipology for base_name {truncated_base_name}")
|
|
return False, None
|
|
elif first_char == 'V': # Check the corresponding list based on the first character
|
|
if central_chars not in media_tipology_V:
|
|
logging.error(f"Invalid media tipology for base_name {truncated_base_name}")
|
|
return False, None
|
|
else: # Invalid first character
|
|
logging.error(f"Invalid first character for base_name {truncated_base_name}")
|
|
return False, None
|
|
|
|
logging.info(f"Valid format for base_name {truncated_base_name}")
|
|
except ValueError as e:
|
|
# Handle the specific ValueError exception
|
|
logging.error(f"Caught a ValueError: {e}")
|
|
# Optionally, take other actions or clean up
|
|
return False, None
|
|
except Exception as e:
|
|
# Handle any other exceptions
|
|
logging.error(f"Caught an unexpected exception: {e}")
|
|
# Optionally, take other actions or clean up
|
|
return False, None
|
|
|
|
# First query: Check if the truncated base_name matches an inventory code in the support table
|
|
check_query = sql.SQL("""
|
|
SELECT 1
|
|
FROM support
|
|
WHERE inventory_code LIKE %s
|
|
LIMIT 1;
|
|
""")
|
|
cur.execute(check_query, (f"{truncated_base_name[:12]}%",))
|
|
result = cur.fetchone()
|
|
|
|
if result:
|
|
logging.info(f"Inventory code {truncated_base_name[:12]} found in the database.")
|
|
return True, truncated_base_name
|
|
else:
|
|
logging.info(f"Inventory code {truncated_base_name} not found in the database.")
|
|
notify_error(f"Inventory code {truncated_base_name} not found in the database.")
|
|
#raise ValueError(f"Inventory code {truncated_base_name} not found in the database.")
|
|
return False, None
|
|
|
|
except Exception as e:
|
|
notify_error(f'Error checking inventory code {base_name}', e)
|
|
raise e
|
|
|
|
# Function to check if the object key exists in the database
|
|
def check_objkey_in_file_db(cur, base_name):
|
|
"""
|
|
Checks if the base_name matches digital_file_name in the file table.
|
|
|
|
Args:
|
|
cur (cursor): The database cursor.
|
|
base_name (str): The base name to check in the database.
|
|
|
|
Returns:
|
|
tuple: A tuple containing a boolean indicating if the base_name was found and the base_name itself or None.
|
|
"""
|
|
logging.debug("Executing check_objkey_in_file_db")
|
|
|
|
try:
|
|
# First query: Check if the base_name matches digital_file_name in the file table
|
|
check_query = sql.SQL("""
|
|
SELECT 1
|
|
FROM file
|
|
WHERE digital_file_name LIKE %s
|
|
LIMIT 1;
|
|
""")
|
|
cur.execute(check_query, (f"{base_name}%",))
|
|
result = cur.fetchone()
|
|
|
|
if result:
|
|
logging.info(f"Inventory code {base_name} found in the database.")
|
|
# Call the function to retrieve digital file names, assuming this function is implemented
|
|
return True
|
|
else:
|
|
logging.info(f"Inventory code {base_name} not found in the database.")
|
|
return False
|
|
|
|
except Exception as e:
|
|
notify_error(f"Error checking inventory code {base_name}", e)
|
|
raise e
|
|
|
|
# Function to add a file record and its relationship to the support record
|
|
def add_file_record_and_relationship(s3_client, cur, base_name,ach_variables):
|
|
"""
|
|
Adds a file record and its relationships to the database and uploads a log file to S3.
|
|
This function performs the following steps:
|
|
1. Loads configuration from the .env file.
|
|
2. Retrieves the support ID for the given base name.
|
|
3. Adds a new file record for the conservative copy.
|
|
4. Adds a relationship between the new file and the support ID.
|
|
5. If the file extension is .mp4 or .mp3, adds a new MP4/MP3 file record and its relationships.
|
|
6. If the file extension is .mp4 and a PDF exists, adds a PDF file record and its relationship to the master file.
|
|
7. Uploads a log file to S3 if all operations are successful.
|
|
Args:
|
|
s3_client (boto3.client): The S3 client used to upload the log file.
|
|
cur (psycopg2.cursor): The database cursor used to execute SQL queries.
|
|
base_name (str): The base name of the file.
|
|
ach_variables (dict): A dictionary containing various variables and configurations needed for the operation.
|
|
Returns:
|
|
bool: True if the operation is successful, False otherwise.
|
|
Raises:
|
|
Exception: If any error occurs during the operation, it is logged and re-raised.
|
|
"""
|
|
# Load the configuration from the .env file
|
|
aws_config, db_config, ach_config, bucket_name, _ = config.load_config()
|
|
|
|
editor_id = ach_config['ach_editor_id']
|
|
approver_id = ach_config['ach_approver_id']
|
|
# Append current date and time to notes in format: yyyy mm dd HH MM SS
|
|
now_dt = datetime.now()
|
|
date_part = now_dt.strftime('%Y %m %d')
|
|
time_part = now_dt.strftime('%H %M %S')
|
|
notes = f"{ach_config.get('ach_notes','') } {date_part} {time_part}"
|
|
|
|
ach_variables['file_copia_conservativa'] = ach_variables['custom_data_in'].get('mediainfo', {}).get("media", {}).get("@ref", "")
|
|
logging.info(f"ach_variables['file_copia_conservativa']: {ach_variables['file_copia_conservativa']}")
|
|
|
|
logging.debug("Executing add_file_record_and_relationship")
|
|
|
|
try:
|
|
# Retrieve the support ID for the given base name
|
|
support_id = retrieve_support_id(cur, ach_variables['inventory_code'])
|
|
if support_id:
|
|
logging.info(f"Found support_id {support_id} for base_name {base_name}.")
|
|
|
|
ach_variables['objectKeys']['conservative_copy'] = ach_variables['file_copia_conservativa'] # must remove _H264
|
|
# replace in ach_variables['file_copia_conservativa'] _H264 with empty string
|
|
ach_variables['objectKeys']['conservative_copy'] = ach_variables['objectKeys']['conservative_copy'].replace('_H264', '')
|
|
|
|
# Add a new file record and get the new file ID
|
|
file_availability_dict = 7 # Place Holder
|
|
# add a new file record for the "copia conservativa"
|
|
ach_variables['custom_data_in']['media_usage'] = 'master' # can be "copia conservativa"
|
|
# determine master mime type using the JSON sidecar metadata (preferred)
|
|
master_mime_type = get_mime_from_mediainfo(ach_variables)
|
|
|
|
new_file_id = add_file_record(
|
|
cur,
|
|
editor_id,
|
|
approver_id,
|
|
ach_variables['disk_size'],
|
|
file_availability_dict,
|
|
notes,
|
|
ach_variables['objectKeys']['conservative_copy'],
|
|
ach_variables['conservative_copy_extension'],
|
|
ach_config['ach_storage_location'],
|
|
master_mime_type,
|
|
ach_variables['custom_data_in']
|
|
)
|
|
|
|
if new_file_id:
|
|
logging.info(f"Added file record for {base_name} with file_id {new_file_id}.")
|
|
# Add a relationship between the new file and the support ID
|
|
status = '{"saved": true, "status": "approved"}' # Define the status JSON
|
|
add_file_support_relationship(cur, editor_id, approver_id, new_file_id, support_id, status)
|
|
|
|
# If the file extension is .mp4, add a new MP4/MP3 file record
|
|
mime_type = get_mime_for_extension(ach_variables.get('extension'))
|
|
if ach_variables['extension'] == '.mp4' or ach_variables['extension'] == '.mp3':
|
|
file_availability_dict = 8 # Hot Storage
|
|
mp4_file_id = add_file_record(
|
|
cur,
|
|
editor_id,
|
|
approver_id,
|
|
ach_variables['media_disk_size'],
|
|
file_availability_dict,
|
|
notes,
|
|
ach_variables['objectKeys']['media'],
|
|
ach_variables['extension'], # deprecated
|
|
{"storage_type": "s3", "storage_location_id": 5},
|
|
mime_type,
|
|
{"media_usage": "streaming"}
|
|
)
|
|
if mp4_file_id:
|
|
logging.info(f"Added MP4/MP3 file record for {base_name} with file_id {mp4_file_id}.")
|
|
# Add a relationship between the MP4 file and the support ID
|
|
status = '{"saved": true, "status": "approved"}' # Define the status JSON
|
|
add_file_support_relationship(cur, editor_id, approver_id, mp4_file_id, support_id, status)
|
|
|
|
# Add a relationship between the streming file(mp4_file_id) and the master file(new_file_id)
|
|
status = '{"saved": true, "status": "approved"}' # Define the status JSON
|
|
file_relation_dict = 10 # Define the relationship dictionary: È re encoding di master
|
|
add_file_relationship(cur, new_file_id, mp4_file_id, file_relation_dict, '{}', status, editor_id, approver_id)
|
|
|
|
# the .mp4 should also have the QC in pdf format: add file as PDF relation with file MASTER as documentation
|
|
if ach_variables['extension'] == '.mp4' and ach_variables['pdf_disk_size'] > 0:
|
|
file_availability_dict = 8 # Hot Storage
|
|
pdf_file_id = add_file_record(
|
|
cur,
|
|
editor_id,
|
|
approver_id,
|
|
ach_variables['pdf_disk_size'],
|
|
file_availability_dict,
|
|
notes,
|
|
ach_variables['objectKeys']['pdf'],
|
|
'.pdf',
|
|
{"storage_type": "s3", "storage_location_id": 5},
|
|
"application/pdf",
|
|
{"media_usage": "documentation"}
|
|
)
|
|
if pdf_file_id:
|
|
logging.info(f"Added PDF file record for {base_name} with file_id {pdf_file_id}.")
|
|
# Add a relationship between the PDF file and the support ID
|
|
# If both MP4 and PDF file IDs exist, add a relationship between them
|
|
if ach_variables['extension'] == '.mp4' and pdf_file_id:
|
|
file_relation_dict = 11 # Define the relationship dictionary e documentazione di master
|
|
custom_data = '{}' # Define any additional custom data if needed
|
|
status = '{"saved": true, "status": "approved"}' # Define the status
|
|
add_file_relationship(cur, new_file_id, pdf_file_id, file_relation_dict, '{}', status,
|
|
editor_id, approver_id)
|
|
|
|
# If everything is successful, upload the log file to S3
|
|
# log_file_path = ach_variables['file_fullpath'] + base_name + '.log'
|
|
# logging.info(f"Uploading log file {log_file_path} to S3...")
|
|
# log_data = "import successful"
|
|
# log_to_s3(s3_client, bucket_name, log_file_path, log_data)
|
|
# logging.info(f"Log file {log_file_path} uploaded to S3.")
|
|
return True
|
|
else:
|
|
logging.error(f"No support_id found for base_name {base_name}.")
|
|
return False
|
|
|
|
except Exception as e:
|
|
notify_error(f"Error adding file record and relationship: {base_name}", e)
|
|
raise e
|
|
|
|
# Functio to add a file record
|
|
def add_file_record(cur, editor_id, approver_id, disk_size, file_availability_dict, notes, base_name, extension,storage_location, file_type, custom_data_in=None):
|
|
try:
|
|
cur.execute("""
|
|
SELECT public.add_file02(
|
|
%s, -- editor_id_in
|
|
%s, -- approver_id_in
|
|
%s, -- disk_size_in
|
|
%s, -- file_availability_dict_in
|
|
%s, -- notes_in
|
|
%s, -- digital_file_name_in
|
|
%s, -- original_file_name_in
|
|
%s, -- status_in
|
|
%s, -- storage_location_in
|
|
%s, -- file_type_in
|
|
%s -- custom_data_in
|
|
)
|
|
""", (
|
|
editor_id,
|
|
approver_id,
|
|
disk_size,
|
|
file_availability_dict, # New parameter
|
|
notes,
|
|
f"{base_name}", # Digital_file_name_in
|
|
f"{base_name}", # Original file name
|
|
'{"saved": true, "status": "approved"}', # Status
|
|
json.dumps(storage_location), # Storage location
|
|
json.dumps({"type": file_type}), # File type
|
|
json.dumps(custom_data_in) # Custom data
|
|
))
|
|
|
|
# Fetch the result returned by the function
|
|
file_result = cur.fetchone()
|
|
return file_result[0] if file_result else None
|
|
|
|
except Exception as e:
|
|
logging.error(f'Error adding file record:', e)
|
|
raise e
|
|
|
|
# Function to ad a file support relationship
|
|
def add_file_support_relationship(cur, editor_id, approver_id, file_id, support_id, status):
|
|
try:
|
|
# Call the stored procedure using SQL CALL statement with named parameters
|
|
cur.execute("""
|
|
CALL public.add_rel_file_support(
|
|
editor_id_in := %s,
|
|
approver_id_in := %s,
|
|
file_id_in := %s,
|
|
support_id_in := %s,
|
|
status_in := %s
|
|
)
|
|
""", (
|
|
editor_id, # editor_id_in
|
|
approver_id, # approver_id_in
|
|
file_id, # file_id_in
|
|
support_id, # support_id_in
|
|
status # status_in
|
|
))
|
|
|
|
# Since the procedure does not return any results, just #print a confirmation message
|
|
logging.info(f"Added file support relationship for file_id {file_id}.")
|
|
|
|
except Exception as e:
|
|
logging.error(f'Error adding file support relationship:', e)
|
|
raise e
|
|
|
|
# Function to add file to file relationship
|
|
def add_file_relationship(cur, file_a_id, file_b_id, file_relation_dict, custom_data, status, editor_id, approver_id):
|
|
try:
|
|
# Call the stored procedure using SQL CALL statement with positional parameters
|
|
cur.execute("""
|
|
CALL public.add_rel_file(
|
|
editor_id_in := %s,
|
|
approver_id_in := %s,
|
|
file_a_id_in := %s,
|
|
file_b_id_in := %s,
|
|
file_relation_dict_in := %s,
|
|
status_in := %s,
|
|
custom_data_in := %s
|
|
)
|
|
""", (
|
|
editor_id, # editor_id_in
|
|
approver_id, # approver_id_in
|
|
file_a_id, # file_a_id_in
|
|
file_b_id, # file_b_id_in
|
|
file_relation_dict, # file_relation_dict_in
|
|
status, # status_in
|
|
custom_data # custom_data_in
|
|
))
|
|
|
|
# Since the procedure does not return a result, just #print a confirmation message
|
|
logging.info(f"Added file relationship between file_id {file_a_id} and file_id {file_b_id}.")
|
|
except Exception as e:
|
|
logging.error(f'Error adding file relationship: {e}', exc_info=True)
|
|
raise e
|
|
|
|
# Function to retrieve the support ID for a given inventory code
|
|
def retrieve_support_id(cur, inventory_code):
|
|
try:
|
|
cur.execute("""
|
|
SELECT MAX(s.id) AS id
|
|
FROM support s
|
|
WHERE s.inventory_code LIKE %s::text
|
|
AND (s.support_type ->> 'type' LIKE 'video' OR s.support_type ->> 'type' LIKE 'audio')
|
|
GROUP BY s.h_base_record_id
|
|
""", (f"{inventory_code}%",))
|
|
|
|
support_result = cur.fetchone()
|
|
logging.info(f"support_result: {support_result[0] if support_result and support_result[0] else None}")
|
|
return support_result[0] if support_result and support_result[0] else None
|
|
|
|
except Exception as e:
|
|
logging.error(f'Error retrieving support_id:', e)
|
|
raise e
|
|
|
|
# Function to retrieve digital_file_name from the database
|
|
def retrieve_digital_file_names(s3_client, cur, base_name,digital_file_name_in):
|
|
try:
|
|
logging.info(f"Retrieving digital file names for inventory code {base_name}... and digital_file_name {digital_file_name_in}")
|
|
# Define the query to retrieve digital_file_name
|
|
query = sql.SQL("""
|
|
WITH supprtId AS (
|
|
SELECT s.id
|
|
FROM support s
|
|
WHERE s.inventory_code LIKE %s
|
|
), rel_fs AS (
|
|
SELECT rfs.file_id
|
|
FROM rel_file_support rfs
|
|
WHERE rfs.support_id IN (SELECT id FROM supprtId)
|
|
)
|
|
SELECT f.id AS file_id, f.digital_file_name
|
|
FROM file f
|
|
WHERE f.id IN (SELECT file_id FROM rel_fs)
|
|
OR f.digital_file_name ILIKE %s
|
|
""")
|
|
|
|
# Execute the query
|
|
cur.execute(query, (f"{base_name[:12]}", f"{digital_file_name_in}.%"))
|
|
results = cur.fetchall()
|
|
|
|
# Process results (for example, printing)
|
|
for result in results:
|
|
logging.info(f"File ID: {result[0]}, Digital File Name: {result[1]}")
|
|
|
|
|
|
# Filter results to match the base_name (without extension)
|
|
matching_files = []
|
|
for row in results:
|
|
digital_file_name = row[1] # Second column: digital_file_name
|
|
# Extract base name without extension
|
|
# base_name_from_file = os.path.splitext(digital_file_name)[0]
|
|
base_name_from_file = os.path.splitext(os.path.basename(digital_file_name))[0]
|
|
logging.info(f"base_name_from_file: {base_name_from_file}")
|
|
# Compare with the provided base_name
|
|
if base_name_from_file == os.path.splitext(os.path.basename(digital_file_name_in))[0]:
|
|
matching_files.append(digital_file_name)
|
|
|
|
logging.info(f"Matching digital file names: {matching_files}")
|
|
if matching_files:
|
|
logging.info(f"Found the following matching digital file names: {matching_files} do not add")
|
|
return False
|
|
else:
|
|
logging.info(f"No matching digital file names found for inventory code {base_name}. try to add new record")
|
|
# Call the function to add record and relationship
|
|
# uncomment the above line to add the record and relationship
|
|
return True
|
|
|
|
except Exception as e:
|
|
logging.error(f'Error retrieving digital file names for inventory code {base_name}:', e)
|
|
raise e
|
|
|
|
# Fuction to get db connection
|
|
def get_db_connection(db_config):
|
|
try:
|
|
conn = psycopg2.connect(**db_config)
|
|
return conn
|
|
except psycopg2.Error as e:
|
|
logging.error(f"Error connecting to the database: {e}")
|
|
raise e
|
|
|
|
# Function to execute a query
|
|
def execute_query(conn, query, params=None):
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(query, params)
|
|
conn.commit()
|
|
except psycopg2.Error as e:
|
|
logging.error(f"Error executing query: {e}")
|
|
raise e
|
|
|
|
#Function count files in the database
|
|
def count_files(cur, mime_type, extension='*', file_dir=True):
|
|
"""
|
|
Function to count files in the database with various filters
|
|
"""
|
|
try:
|
|
# Base query components
|
|
query = (
|
|
"SELECT COUNT(DISTINCT h_base_record_id) "
|
|
"FROM file "
|
|
"WHERE file_type ->> 'type' IS NOT NULL"
|
|
)
|
|
|
|
args = ()
|
|
|
|
# Handle mime_type list: expand into ARRAY[...] with individual placeholders
|
|
if mime_type:
|
|
if isinstance(mime_type, (list, tuple)):
|
|
# Create placeholders for each mime type and append to args
|
|
placeholders = ','.join(['%s'] * len(mime_type))
|
|
query += f" AND file_type ->> 'type' = ANY(ARRAY[{placeholders}])"
|
|
args += tuple(mime_type)
|
|
else:
|
|
# single value
|
|
query += " AND file_type ->> 'type' = %s"
|
|
args += (mime_type,)
|
|
|
|
# Add extension condition if not wildcard
|
|
if extension != '*' and extension is not None:
|
|
query += " AND digital_file_name ILIKE %s"
|
|
args = args + (f'%{extension}',)
|
|
# If extension is '*', no additional condition is needed (matches any extension)
|
|
|
|
# Add file directory condition based on file_dir parameter
|
|
if file_dir:
|
|
# Only files in directory (original_file_name starts with 'FILE')
|
|
query += " AND original_file_name ILIKE %s"
|
|
args = args + ('FILE%',)
|
|
else:
|
|
# Exclude files in directory (original_file_name does not start with 'FILE')
|
|
query += " AND original_file_name NOT ILIKE %s"
|
|
args = args + ('FILE%',)
|
|
|
|
try:
|
|
logging.debug("Executing count_files SQL: %s -- args: %s", query, args)
|
|
cur.execute(query, args)
|
|
result = cur.fetchone()
|
|
# fetchone() returns a sequence or None; protect against unexpected empty sequences
|
|
if not result:
|
|
return 0
|
|
try:
|
|
return result[0]
|
|
except (IndexError, TypeError) as idx_e:
|
|
logging.exception("Unexpected result shape from count query: %s", result)
|
|
raise
|
|
except Exception as e:
|
|
logging.exception('Error executing count_files with query: %s', query)
|
|
raise
|
|
except Exception as e:
|
|
logging.error('Error counting files: %s', e)
|
|
raise e
|
|
|
|
|