import re import logging import os def analyze_pattern_match(text, description): """Analyze which part of the 12-char pattern is not matching. The code currently truncates base/folder names to the first 12 characters and uses the pattern r'^[VA][OC]-[A-Z0-9]{3}-\d{5}$' which is 12 characters long. This function therefore validates a 12-character string and avoids indexing beyond its length. """ if not text: return [f"{description}: Empty or None text"] issues = [] expected_length = 12 # Pattern: [VA][OC]-[3chars]-[5digits] # Check length (analyze_pattern_match is only for the 12-char base) if len(text) != expected_length: issues.append(f"Length mismatch: expected {expected_length}, got {len(text)}") return issues # Step 1: Check 1st character - V or A if text[0] not in ['V', 'A']: issues.append(f"Position 1: Expected [V,A], got '{text[0]}'") # Step 2: Check 2nd character - O or C if text[1] not in ['O', 'C']: issues.append(f"Position 2: Expected [O,C], got '{text[1]}'") # Step 3: Check 3rd character - dash if text[2] != '-': issues.append(f"Position 3: Expected '-', got '{text[2]}'") # Step 4: Check positions 4,5,6 - [A-Z0-9] for i in range(3, 6): if not re.match(r'^[A-Z0-9]$', text[i]): issues.append(f"Position {i+1}: Expected [A-Z0-9], got '{text[i]}'") # Step 5: Check 7th character - dash if text[6] != '-': issues.append(f"Position 7: Expected '-', got '{text[6]}'") # Step 6: Check positions 8-12 - digits for i in range(7, 12): if not text[i].isdigit(): issues.append(f"Position {i+1}: Expected digit, got '{text[i]}'") return issues def validate_inventory_code(inventory_code): """ Validates the given inventory code. Args: inventory_code (str): The inventory code to validate. Returns: bool: True if the inventory code is valid, False otherwise. """ # Check the base_name format with regex pattern first pattern = r'^[VA][OC]-[A-Z0-9]{3}-\d{5}$' # first 12 characters (safely handle non-string input) if not isinstance(inventory_code, str) or inventory_code == '': logging.warning("Empty or non-string inventory code provided to validate_inventory_code") return False truncated_base_name = inventory_code[:12] logging.info("Inventory Code: %s", truncated_base_name) # Use fullmatch on the truncated string so the pattern covers exactly 12 chars if re.fullmatch(pattern, truncated_base_name): logging.info("Inventory code '%s' is valid.", inventory_code) return True # When base validation fails, provide a detailed analysis to help logging issues = analyze_pattern_match(truncated_base_name, "Base name") for issue in issues: logging.warning("Inventory code base issue: %s", issue) logging.warning("Inventory code '%s' is invalid.", inventory_code) return False def validate_icode_extension(file_inventory_code): """ Validate an inventory code's optional extension. Rules: - The base is the first 12 characters and must match the same pattern used by :func:`validate_inventory_code`. - Total length must be <= 17 characters (12 base + up to 5-char extension). - The extension (characters after position 12) is optional. When present it must match the expected pattern for the code type found in positions 4-6 of the 12-character base (for example: 'BTC', 'OA4', 'DVD', 'MCC'). Args: file_inventory_code (str): Full inventory code (base + optional extension). Returns: bool: True when base is valid and extension is absent or valid for the detected code type; False otherwise. Raises: ValueError: If the provided string length exceeds the allowed maximum (17 characters). """ # mapping from 3-char code to extension pattern (includes leading underscore) file_type_to_regex = { 'BTC': r'_\d{4}', 'OA4': r'_\d{2}', 'DVD': r'_\d{2}', 'BRD': r'_\d{2}', 'MCC': r'_[AB]', 'DBT': r'_\d{4}', 'M4V': r'_\d{2}', 'AVI': r'_\d{2}', 'MOV': r'_\d{2}', 'MP4': r'_\d{2}', 'MXF': r'_\d{2}', 'MPG': r'_\d{2}' } if not isinstance(file_inventory_code, str) or file_inventory_code == '': logging.warning("Empty or non-string inventory code provided to validate_icode_extension") return False # security: ignore any folder prefix or path components that might be # passed in from external sources. file_inventory_code = os.path.basename(file_inventory_code) # remove common file extensions and only remove _H264 if it's an .mp4 file if file_inventory_code.lower().endswith('.mp4'): file_inventory_code = file_inventory_code.replace("_H264", "") file_inventory_code = os.path.splitext(file_inventory_code)[0] # Enforce maximum length (12 base + up to 5 extension chars) if len(file_inventory_code) > 17: logging.warning("Inventory code '%s' exceeds maximum allowed length (17).", file_inventory_code) # Only raise the error if DRY RUN is false; otherwise, just log it as a warning if os.getenv('ACH_DRY_RUN', 'true').lower() == 'false': raise ValueError("Inventory code with extension exceeds maximum length of 17 characters.") else: return False # Validate base first (first 12 chars). If base invalid -> reject. base = file_inventory_code[:12] if not validate_inventory_code(base): logging.warning("Base inventory code '%s' is invalid.", base) return False # Determine type (positions 4-6 of the base) and the extension (may be empty) support_type = base[3:6] extension = file_inventory_code[12:] if extension == '': logging.info("Extension for '%s' is empty (valid).", file_inventory_code) return True expected_ext_pattern = file_type_to_regex.get(support_type) if expected_ext_pattern is None: logging.warning("Unsupported type '%s' for extension validation in '%s'.", support_type, file_inventory_code) return False if not re.fullmatch(expected_ext_pattern, extension): logging.warning("Extension '%s' does not match expected pattern '%s' for type '%s'.", extension, expected_ext_pattern, support_type) return False logging.info("Inventory code with extension '%s' is valid.", file_inventory_code) return True def analyze_extension_pattern(file_inventory_code): """Analyze extension and base-length issues for a full inventory code. Returns a list of human-readable issues (empty list means no issues found). This function does not raise; it only returns diagnostics. """ issues = [] if not isinstance(file_inventory_code, str) or file_inventory_code == '': return ["Empty or non-string inventory code"] extension_part = file_inventory_code[12:] # Base truncated for the primary 12-char checks base_part = base_part[:12] base_issues = analyze_pattern_match(base_part, "Base12") issues.extend([f"Base12: {it}" for it in base_issues]) extension_part = file_inventory_code[12:] # For clarity and maintainability prefer describing an example extension # for each type (strings) rather than numeric magic values. Consumers # can compute lengths via len(example). We do not enforce base-part # total lengths here; instead we validate the extension against the # per-type regex (see below) which is the primary source of truth. ''' extension_examples = { 'BTC': '_1234', # 4 digits after underscore 'OA4': '_01', # 2 digits 'DVD': '_01', # 2 digits 'MCC': '_A', # single letter A or B 'DBT': '_01223', # example 5-char extension (if used) } ''' support_type = file_inventory_code[3:6] if len(file_inventory_code) >= 6 else '' # Extension validation similar to validate_icode_extension file_type_to_regex = { 'BTC': r'_\d{4}', 'OA4': r'_\d{2}', 'DVD': r'_\d{2}', 'MCC': r'_[AB]' } if extension == '': issues.append("No extension present") return issues expected_ext = file_type_to_regex.get(support_type) if expected_ext is None: issues.append(f"Unsupported type '{support_type}' for extension validation") return issues if not re.fullmatch(expected_ext, extension): issues.append(f"Extension '{extension}' does not match expected pattern '{expected_ext}' for type '{support_type}'") return issues def validate_mp4_file(file_list): """ Validates if the given file is a valid MP4 file. Args: file_path (str): The path to the MP4 file. Returns: bool: True if the file is a valid MP4 file, False otherwise. """ """ Given an iterable of file paths, return the subset that end with '.mp4'. This is intentionally lightweight: it only filters by extension and returns a list of accepted paths. Additional checks (metadata, codecs) should be implemented by callers if needed. """ valid_files = [] try: for f in file_list: if not isinstance(f, str): logging.debug("Skipping non-string entry in file_list: %r", f) continue if f.lower().endswith('.mp4'): valid_files.append(f) return valid_files except Exception as e: logging.error("Error validating MP4 files: %s", e) return [] def validate_mp3_file(file_list): """ Validates the given MP3 file. Args: file_list (list): List of MP3files to validate. Returns: bool: True if the file is a valid MP3 file, False otherwise. """ """ Given an iterable of file paths, return the subset that end with '.mp3'. Lightweight filter by extension; further validation may be done elsewhere. """ valid_files = [] try: for f in file_list: if not isinstance(f, str): logging.debug("Skipping non-string entry in file_list: %r", f) continue if f.lower().endswith('.mp3'): valid_files.append(f) return valid_files except Exception as e: logging.error("Error validating MP3 files: %s", e) return [] def list_s3_not_in_db(s3_list, db_list, db_sidecar_basenames): """ Return the subset of S3 object keys that are not represented in the database either as an exact key or as a basename for which a sidecar already exists. Args: s3_list (Iterable[str]): Sequence of S3 object keys (typically filenames or object paths) to be checked. Order and duplicates in this sequence are preserved in the result. db_list (Iterable[str]): Sequence of keys that are known to exist in the database. Any exact match between an item in s3_list and an item in db_list will cause the S3 item to be excluded from the result. db_sidecar_basenames (Iterable[str] or set[str]): Collection of basenames (file names without their final extension) for which the database already contains a sidecar. If the basename of an S3 key (computed via os.path.splitext) is present in this collection, the corresponding S3 key will be excluded. Returns: list[str]: A list of keys from s3_list that are not present in db_list and whose basenames are not present in db_sidecar_basenames. Matching is exact (case-sensitive) and performed on the raw string values; the function does not normalize paths or perform filesystem I/O. Notes: - Basename extraction uses os.path.splitext, which removes only the final extension (the portion after the last dot). For example, "foo.tar.gz" yields basename "foo.tar". - The function does not modify the input iterables and preserves the order of items from s3_list that pass the filters. - For best performance, pass db_sidecar_basenames (and db_list if large) as a set to have O(1) membership checks. Examples: >>> s3_list = ["a.jpg", "b.jpg", "c.txt"] >>> db_list = ["b.jpg"] >>> db_sidecar_basenames = {"c"} >>> list_s3_not_in_db(s3_list, db_list, db_sidecar_basenames) ["a.jpg"] """ file_names = [] for f in s3_list: # exact key present in DB -> skip if f in db_list: # logging.info("Skipping %s because exact key present in DB", _visible_spaces(f)) continue # strip extension to get basename and skip if DB has sidecar for it base = os.path.splitext(f)[0] if base in db_sidecar_basenames: # logging.info("Skipping %s because DB already contains sidecar for basename %s", _visible_spaces(f), _visible_spaces(base)) continue # else add to list file_names.append(f) return file_names if __name__ == "__main__": # Improved test harness for the validation functions in this module. # - Structured test cases with expected outcomes (value or exception) # - Prints PASS/FAIL and a short summary # Note: We avoid calling analyze_extension_pattern() from tests because # that helper currently contains a runtime bug in this file. If you want # to enable those diagnostics, fix analyze_extension_pattern first and # then add tests that exercise it. logging.basicConfig(level=logging.INFO) print("Running validation_utils structured tests...\n") # small helper to wrap callables that may raise def call_fn(fn): try: return (False, fn()) except Exception as e: return (True, e) cases = [ # analyze_pattern_match -> expect lists (empty == no issues) {"name": "analyze_pattern_match valid", "fn": lambda: analyze_pattern_match("VO-DVD-12345", "Base"), "expect": []}, {"name": "analyze_pattern_match invalid length", "fn": lambda: analyze_pattern_match("VO-DVD-1234", "Base"), "expect_non_empty": True}, {"name": "analyze_pattern_match invalid chars", "fn": lambda: analyze_pattern_match("XO-DVD-12345", "Base"), "expect_non_empty": True}, # validate_inventory_code {"name": "validate_inventory_code valid", "fn": lambda: validate_inventory_code("VO-DVD-12345"), "expect": True}, {"name": "validate_inventory_code invalid prefix", "fn": lambda: validate_inventory_code("XO-DVD-12345"), "expect": False}, {"name": "validate_inventory_code invalid format", "fn": lambda: validate_inventory_code("VO-DV-12345"), "expect": False}, # validate_icode_extension (valid and invalid) {"name": "validate_icode_extension no ext", "fn": lambda: validate_icode_extension("VO-DVD-12345"), "expect": True}, {"name": "validate_icode_extension BARE prefix", "fn": lambda: validate_icode_extension("BTC/VO-DVD-12345"), "expect": True}, {"name": "validate_icode_extension BTC valid", "fn": lambda: validate_icode_extension("VO-BTC-12345_1234"), "expect": True}, {"name": "validate_icode_extension DVD valid", "fn": lambda: validate_icode_extension("VO-DVD-12345_12"), "expect": True}, {"name": "validate_icode_extension MCC valid", "fn": lambda: validate_icode_extension("VO-MCC-12345_A"), "expect": True}, {"name": "validate_icode_extension unsupported type", "fn": lambda: validate_icode_extension("VO-XYZ-12345_12"), "expect": False}, {"name": "validate_icode_extension too long extension (raises)", "fn": lambda: validate_icode_extension("VO-DVD-12345_12345"), "expect_exception": ValueError}, {"name": "validate_icode_extension prefix too long (raises)", "fn": lambda: validate_icode_extension("XYZ/VO-DVD-12345_12345"), "expect_exception": ValueError}, {"name": "validate_icode_extension BTC invalid pattern", "fn": lambda: validate_icode_extension("VO-BTC-12345_12"), "expect": False}, # mp4/mp3 validators - return lists {"name": "validate_mp4_file valid list", "fn": lambda: validate_mp4_file(["folder/VO-DVD-12345.mp4", "folder/movie_H264.mp4"]), "expect": ["folder/VO-DVD-12345.mp4", "folder/movie_H264.mp4"]}, {"name": "validate_mp4_file no mp4s", "fn": lambda: validate_mp4_file(["folder/readme.txt", "folder/image.jpg"]), "expect": []}, {"name": "validate_mp3_file valid list", "fn": lambda: validate_mp3_file(["audio/VO-DVD-12345.mp3"]), "expect": ["audio/VO-DVD-12345.mp3"]}, {"name": "validate_mp3_file no mp3s", "fn": lambda: validate_mp3_file(["audio/readme.md"]), "expect": []}, # list_s3_not_in_db {"name": "list_s3_not_in_db some available", "fn": lambda: list_s3_not_in_db(["a.mp4", "b.mp3", "c.pdf"], ["b.mp3"], {"a"}), "expect": ["c.pdf"]}, {"name": "list_s3_not_in_db all filtered by sidecar", "fn": lambda: list_s3_not_in_db(["a.mp4"], [], {"a"}), "expect": []}, {"name": "list_s3_not_in_db all filtered by exact db match", "fn": lambda: list_s3_not_in_db(["b.mp3"], ["b.mp3"], set()), "expect": []}, ] passed = 0 failed = 0 for case in cases: name = case["name"] expect = case.get("expect", None) expect_non_empty = case.get("expect_non_empty", False) expect_exception = case.get("expect_exception", None) raised, result = call_fn(case["fn"]) ok = False details = None if expect_exception is not None: # We expect an exception of a given type if raised and isinstance(result, expect_exception): ok = True else: ok = False details = f"expected exception {expect_exception.__name__}, got {'no exception' if not raised else type(result).__name__}" else: # No exception expected if raised: ok = False details = f"raised unexpected {type(result).__name__}: {result}" else: # compare results if expect_non_empty: ok = bool(result) if not ok: details = f"expected non-empty result, got {result!r}" elif expect is not None: ok = (result == expect) if not ok: details = f"expected {expect!r}, got {result!r}" else: # fallback: treat truthiness as success ok = bool(result) if ok: print(f"PASS: {name}") passed += 1 else: print(f"FAIL: {name} -> {details if details else result}") failed += 1 print(f"\nTest summary: passed={passed}, failed={failed}, total={passed+failed}") # Mark remaining todo: run syntax check # (We don't call analyze_extension_pattern here to avoid hitting the current bug.)