Source code for src.batch_processing

import os
import json
import pandas as pd
import concurrent.futures

from .input import load_data
from .validation import DataValidator
from .mapping import OntologyMapper
from .missing_data import detect_missing_data, flag_missing_data_records, impute_missing_data
from .reporting import generate_qc_report, create_visual_summary
from .configuration import load_config
from .logging_module import log_activity, setup_logging
from tqdm import tqdm
import hashlib

[docs]def child_process_run( file_path, schema, ontology_mapper, unique_identifiers, custom_mappings, impute_strategy, field_strategies, output_dir, target_ontologies, report_format, chunksize, phenotype_columns, log_file_for_children ): """ This top-level function is what each child process calls. We do the logging re-init in append mode, then run process_file. """ setup_logging(log_file=log_file_for_children, mode='a') return process_file( file_path=file_path, schema=schema, ontology_mapper=ontology_mapper, unique_identifiers=unique_identifiers, custom_mappings=custom_mappings, impute_strategy=impute_strategy, field_strategies=field_strategies, output_dir=output_dir, target_ontologies=target_ontologies, report_format=report_format, chunksize=chunksize, phenotype_columns=phenotype_columns )
[docs]def unique_output_name(file_path, output_dir, suffix='.csv'): """ Creates a unique output filename using: - The original file's *base name* (not the entire path), - A short 5-char hash based on that base name (to avoid collisions), - The original extension (e.g. .json -> '_json'), - And finally the desired suffix (.csv, _report.pdf, etc.). """ just_name = os.path.basename(file_path) short_hash = hashlib.md5(just_name.encode('utf-8')).hexdigest()[:5] base_no_ext, orig_ext = os.path.splitext(just_name) ext_no_dot = orig_ext.lstrip('.') # e.g. "json" final_name = f"{base_no_ext}_{short_hash}_{ext_no_dot}{suffix}" return os.path.join(output_dir, final_name)
[docs]def convert_nans_to_none_for_string_cols(df, schema): """ Converts NaN to None for columns declared as type=["string","null"] (or "string") in the JSON schema. This ensures row-level validation won't flag them as float('NaN'). """ df_converted = df.copy(deep=True) props = schema.get("properties", {}) for col, rules in props.items(): declared_type = rules.get("type") if isinstance(declared_type, str) and declared_type == "string": pass_types = ["string"] elif isinstance(declared_type, list) and "string" in declared_type: pass_types = declared_type else: continue if col in df_converted.columns: df_converted[col] = df_converted[col].where(df_converted[col].notna(), None) return df_converted
[docs]def get_file_type(file_path): """ Returns 'csv', 'tsv', or 'json' depending on the file extension. Raises ValueError if unsupported. """ _, ext = os.path.splitext(file_path.lower()) if ext == '.csv': return 'csv' elif ext == '.tsv': return 'tsv' elif ext == '.json': return 'json' else: raise ValueError(f"Unsupported file extension: {ext}")
[docs]def process_file( file_path, schema, ontology_mapper, unique_identifiers, custom_mappings=None, impute_strategy='mean', field_strategies=None, output_dir='reports', target_ontologies=None, report_format='pdf', chunksize=10000, phenotype_columns=None ): """ Processes a single file, generating an output CSV and a PDF/MD report. We only changed how we build the final filenames and how we display the file name in the PDF's "Source file" reference. """ file_type = get_file_type(file_path) log_activity(f"[ChildProcess] Starting on: {file_path}", level='info') final_status = 'Processed' error_msg = None try: with tqdm(total=100, desc=f"Processing {os.path.basename(file_path)}") as pbar: # 1) Attempt data loading try: data_iterator = load_data(file_path, file_type, chunksize=chunksize) except Exception as e: final_status = 'ProcessedWithWarnings' error_msg = f"Could not load data from {file_path}: {str(e)}" log_activity(f"{file_path}: {error_msg}", level='warning') data_iterator = [] pbar.update(5) log_activity("Data loading initiated.") all_chunks = [] if final_status != 'ProcessedWithWarnings': try: first_chunk = next(data_iterator, None) except StopIteration: final_status = 'ProcessedWithWarnings' error_msg = f"No data found in {file_path}. Generating partial PDF." log_activity(f"{file_path}: {error_msg}", level='warning') first_chunk = None except Exception as e: final_status = 'ProcessedWithWarnings' error_msg = f"Error reading first chunk: {str(e)}" log_activity(f"{file_path}: {error_msg}", level='warning') first_chunk = None if first_chunk is not None and not first_chunk.empty: all_chunks = [first_chunk] for c in data_iterator: all_chunks.append(c) else: if not error_msg: error_msg = f"{file_path} is empty or has no valid rows." final_status = 'ProcessedWithWarnings' log_activity(f"{file_path}: {error_msg}", level='warning') # 2) Accumulators total_records = 0 flagged_records_count = 0 sample_df = pd.DataFrame() sample_size_per_chunk = 1000 max_total_samples = 10000 # Build final CSV path output_data_file = unique_output_name(file_path, output_dir, suffix='.csv') if os.path.exists(output_data_file): os.remove(output_data_file) write_header = True # Provide a fallback for phenotype_columns if phenotype_columns is None: phenotype_columns = { "Phenotype": ["HPO"], "PrimaryPhenotype": ["HPO"], "DiseaseCode": ["DO"], "TertiaryPhenotype": ["MPO"] } # Track mapping stats cumulative_mapping_stats = {} for column, ontologies in phenotype_columns.items(): for onto_id in ontologies: if onto_id not in cumulative_mapping_stats: cumulative_mapping_stats[onto_id] = {'total_terms': 0, 'mapped_terms': 0} format_valid = True duplicate_records = [] conflicting_records = [] integrity_issues = [] anomalies_detected = pd.DataFrame() missing_counts = pd.Series(dtype=int) unique_id_set = set() global_invalid_mask = pd.DataFrame() row_offset = 0 chunk_progress = 80 # NEW aggregator: track row indices that fail JSON schema schema_fail_indices_global = set() # 3) Process each chunk for chunk in all_chunks: if chunk is None or chunk.empty: continue nrows_chunk = len(chunk) if nrows_chunk == 0: continue chunk.index = range(row_offset, row_offset + nrows_chunk) row_offset += nrows_chunk total_records += nrows_chunk # (A) Validate chunk chunk = convert_nans_to_none_for_string_cols(chunk, schema) try: validator = DataValidator(chunk, schema, unique_identifiers) chunk_results = validator.run_all_validations() except KeyError as e: missing_col = str(e).strip("'") required_cols = schema.get("required", []) if (missing_col in required_cols) or (missing_col in unique_identifiers): final_status = 'ProcessedWithWarnings' msg = (f"Missing *required* or unique-id column '{missing_col}' " f"in chunk => warnings.") log_activity(f"{file_path}: {msg}", level='warning') chunk_results = { "Format Validation": False, "Duplicate Records": pd.DataFrame(), "Conflicting Records": pd.DataFrame(), "Integrity Issues": pd.DataFrame(), "Referential Integrity Issues": pd.DataFrame(), "Anomalies Detected": pd.DataFrame(), "Invalid Mask": pd.DataFrame(False, index=chunk.index, columns=chunk.columns) } else: # It's an optional column => skip silently log_activity( f"Skipping optional column '{missing_col}' for chunk, not raising warnings.", level='info' ) new_id_list = [col for col in unique_identifiers if col != missing_col] validator = DataValidator(chunk, schema, new_id_list) chunk_results = validator.run_all_validations() except Exception as ex: final_status = 'ProcessedWithWarnings' msg2 = f"Error during validation: {str(ex)}" log_activity(f"{file_path}: {msg2}", level='warning') chunk_results = { "Format Validation": False, "Duplicate Records": pd.DataFrame(), "Conflicting Records": pd.DataFrame(), "Integrity Issues": pd.DataFrame(), "Referential Integrity Issues": pd.DataFrame(), "Anomalies Detected": pd.DataFrame(), "Invalid Mask": pd.DataFrame(False, index=chunk.index, columns=chunk.columns) } # --- ADDED DEBUG for chunk_results['Invalid Mask'] --- invalid_mask_chunk = chunk_results["Invalid Mask"] if invalid_mask_chunk.any().any(): log_activity(f"[DEBUG] Invalid cells found in chunk (size={invalid_mask_chunk.shape}).", level='info') # Show a small subset of the True cells stacked_mask = invalid_mask_chunk.stack() true_positions = stacked_mask[stacked_mask == True] limited_positions = true_positions[:20].to_dict() # just show top 20 log_activity(f"[DEBUG] Sample invalid cells: {limited_positions}", level='info') else: log_activity("[DEBUG] No invalid cells in this chunk.", level='info') # Keep track of row-level schema fails if 'SchemaViolationFlag' is set if 'SchemaViolationFlag' in chunk.columns: fails_in_chunk = chunk.index[chunk['SchemaViolationFlag'] == True] for row_id in fails_in_chunk: schema_fail_indices_global.add(row_id) if len(fails_in_chunk) > 0: log_activity( f"[DEBUG] {len(fails_in_chunk)} row(s) with SchemaViolationFlag=True in this chunk.", level='info' ) # Optionally show them snippet_df = chunk.loc[fails_in_chunk].head(5) log_activity(f"[DEBUG] Sample of failing rows:\n{snippet_df}", level='info') # (B) Format validation? if not chunk_results["Format Validation"]: format_valid = False if not chunk_results["Integrity Issues"].empty: integrity_issues.append(chunk_results["Integrity Issues"]) if not chunk_results["Duplicate Records"].empty: duplicate_records.append(chunk_results["Duplicate Records"]) if not chunk_results["Conflicting Records"].empty: conflicting_records.append(chunk_results["Conflicting Records"]) if not chunk_results["Anomalies Detected"].empty: anomalies_detected = pd.concat([anomalies_detected, chunk_results["Anomalies Detected"]]) if not chunk_results["Integrity Issues"].empty: integrity_issues.append(chunk_results["Integrity Issues"]) # Merge invalid mask chunk_invalid_mask = chunk_results["Invalid Mask"] all_cols = sorted(set(global_invalid_mask.columns) | set(chunk_invalid_mask.columns)) global_invalid_mask = global_invalid_mask.reindex(columns=all_cols, fill_value=False) chunk_invalid_mask = chunk_invalid_mask.reindex(columns=all_cols, fill_value=False) global_invalid_mask = pd.concat([global_invalid_mask, chunk_invalid_mask], axis=0) # (C) Duplicates across chunks if unique_identifiers: ids_in_chunk = set(map(tuple, chunk[unique_identifiers].drop_duplicates().values.tolist())) duplicates_in_ids = unique_id_set.intersection(ids_in_chunk) if duplicates_in_ids: cross_dup = chunk[chunk[unique_identifiers].apply(tuple, axis=1).isin(duplicates_in_ids)] duplicate_records.append(cross_dup) unique_id_set.update(ids_in_chunk) # (D) Missing data missing = detect_missing_data(chunk) missing_counts = missing_counts.add(missing, fill_value=0) chunk = flag_missing_data_records(chunk) flagged_records_count += chunk['MissingDataFlag'].sum() # Impute try: chunk = impute_missing_data(chunk, strategy=impute_strategy, field_strategies=field_strategies) except Exception as ex_impute: final_status = 'ProcessedWithWarnings' msg3 = f"Error in imputation: {str(ex_impute)}" log_activity(f"{file_path}: {msg3}", level='warning') chunk = flag_missing_data_records(chunk) # (E) Ontology mapping for column, ontologies in phenotype_columns.items(): if column not in chunk.columns: log_activity(f"Skipping optional column '{column}' (not present).", level='info') continue terms_in_chunk = chunk[column].dropna().unique() if len(terms_in_chunk) == 0: continue mappings = ontology_mapper.map_terms(terms_in_chunk, ontologies, custom_mappings) for onto_id in ontologies: col_name = f"{onto_id}_ID" chunk[col_name] = chunk[column].map( lambda x: mappings.get(str(x), {}).get(onto_id) if pd.notnull(x) else None ) if onto_id not in cumulative_mapping_stats: cumulative_mapping_stats[onto_id] = {'total_terms': 0, 'mapped_terms': 0} valid_terms = [t for t in terms_in_chunk if pd.notnull(t)] cumulative_mapping_stats[onto_id]['total_terms'] += len(valid_terms) cumulative_mapping_stats[onto_id]['mapped_terms'] += sum( 1 for t in valid_terms if mappings.get(str(t), {}).get(onto_id) is not None ) # (F) Accumulate sample df if len(sample_df) < max_total_samples: remaining = max_total_samples - len(sample_df) chunk_sample_size = min(sample_size_per_chunk, remaining) if len(chunk) > chunk_sample_size: sample_chunk = chunk.sample(n=chunk_sample_size, random_state=42) else: sample_chunk = chunk.copy() sample_df = pd.concat([sample_df, sample_chunk], ignore_index=True) # (G) Write chunk to final CSV try: chunk.to_csv(output_data_file, mode='a', index=False, header=write_header) if write_header: write_header = False except Exception as ex_csv: final_status = 'ProcessedWithWarnings' log_activity(f"Error writing CSV output: {str(ex_csv)}", level='warning') # Update progress bar chunk_ratio = max(1, total_records / chunksize) pbar.update(chunk_progress / chunk_ratio) # 4) Summarize if not format_valid: num_invalid_integrity = sum(len(df_part) for df_part in integrity_issues) if integrity_issues else 0 msg4 = f"Format validation failed. {num_invalid_integrity} record(s) do not match the JSON schema." log_activity(f"{file_path}: {msg4}", level='warning') if error_msg: error_msg += f" | {msg4}" else: error_msg = msg4 final_status = 'ProcessedWithWarnings' all_duplicates = pd.concat(duplicate_records).drop_duplicates() if duplicate_records else pd.DataFrame() all_conflicts = pd.concat(conflicting_records).drop_duplicates() if conflicting_records else pd.DataFrame() all_integrity = pd.concat(integrity_issues).drop_duplicates() if integrity_issues else pd.DataFrame() anomalies_detected = anomalies_detected.drop_duplicates() if not anomalies_detected.empty else pd.DataFrame() validation_results = { "Format Validation": format_valid, "Duplicate Records": all_duplicates, "Conflicting Records": all_conflicts, "Integrity Issues": all_integrity, "Referential Integrity Issues": pd.DataFrame(), "Anomalies Detected": anomalies_detected, "Invalid Mask": global_invalid_mask.sort_index() } # 5) Mapping stats mapping_success_rates = {} for onto_id, stats in cumulative_mapping_stats.items(): total_terms = stats['total_terms'] mapped_terms = stats['mapped_terms'] success_rate = (mapped_terms / total_terms) * 100 if total_terms > 0 else 0 mapping_success_rates[onto_id] = { 'total_terms': total_terms, 'mapped_terms': mapped_terms, 'success_rate': success_rate } total_records = total_records or 1 num_schema_fails = len(schema_fail_indices_global) valid_records_for_schema = total_records - num_schema_fails schema_validation_score = (valid_records_for_schema / total_records) * 100 total_cells = total_records * len(sample_df.columns) total_missing = missing_counts.sum() if len(sample_df.columns) == 0: missing_data_score = 100.0 else: missing_data_score = ((total_cells - total_missing) / total_cells) * 100 if total_cells > 0 else 100.0 success_rates_list = [v['success_rate'] for v in mapping_success_rates.values()] mapping_success_score = sum(success_rates_list)/len(success_rates_list) if success_rates_list else 0 overall_quality_score = (schema_validation_score + missing_data_score + mapping_success_score) / 3.0 quality_scores = { 'Schema Validation Score': schema_validation_score, 'Missing Data Score': missing_data_score, 'Mapping Success Score': mapping_success_score, 'Overall Quality Score': overall_quality_score } report_path = unique_output_name(file_path, output_dir, suffix='_report.pdf') figs = create_visual_summary(sample_df, phenotype_columns=phenotype_columns, output_image_path=None) visualization_images = [] for idx, fig in enumerate(figs): image_filename = f"{os.path.splitext(os.path.basename(file_path))[0]}_visual_{idx}.png" image_path = os.path.join(output_dir, image_filename) try: fig.write_image(image_path, format='png', scale=2) visualization_images.append(image_path) except Exception as e: log_activity(f"Error saving image {image_filename}: {e}", level='error') base_display_name = os.path.basename(file_path) generate_qc_report( validation_results=validation_results, missing_data=missing_counts, flagged_records_count=flagged_records_count, mapping_success_rates=mapping_success_rates, visualization_images=visualization_images, impute_strategy=impute_strategy, quality_scores=quality_scores, output_path_or_buffer=report_path, report_format=report_format, file_identifier=base_display_name ) log_activity(f"{file_path}: QC report generated at {report_path}.") pbar.update(5) log_activity(f"{file_path}: Processed data saved at {output_data_file}") pbar.update(5) pbar.close() return { 'file': file_path, 'status': final_status, 'error': error_msg, 'validation_results': validation_results, 'missing_data': missing_counts, 'flagged_records_count': flagged_records_count, 'processed_file_path': output_data_file, 'report_path': report_path, 'mapping_success_rates': mapping_success_rates, 'visualization_images': visualization_images, 'quality_scores': quality_scores } except Exception as e: log_activity(f"Error processing file {file_path}: {str(e)}", level='error') return { 'file': file_path, 'status': 'Error', 'error': str(e) }
[docs]def batch_process( files, schema_path, config_path, unique_identifiers, custom_mappings_path=None, impute_strategy='mean', output_dir='reports', target_ontologies=None, report_format='pdf', chunksize=10000, phenotype_columns=None, phenotype_column=None, log_file_for_children=None ): log_activity(f"[ParentProcess] Starting on: {files}", level='info') # 1) Load the schema with open(schema_path) as f: schema = json.load(f) # 2) Load config config = load_config(config_path) # 3) Create OntologyMapper ontology_mapper = OntologyMapper(config) # 4) Load custom mappings custom_mappings = None if custom_mappings_path: with open(custom_mappings_path) as f: custom_mappings = json.load(f) # Convert old style to new style if needed if phenotype_column and not phenotype_columns: phenotype_columns = {phenotype_column: ["HPO"]} # 5) In parallel, call child_process_run results = [] with concurrent.futures.ProcessPoolExecutor() as executor: futures = [] for file_path in files: future = executor.submit( child_process_run, file_path, schema, ontology_mapper, unique_identifiers, custom_mappings, impute_strategy, None, output_dir, target_ontologies, report_format, chunksize, phenotype_columns, log_file_for_children ) futures.append(future) for future in concurrent.futures.as_completed(futures): try: result = future.result() results.append(result) except Exception as e: log_activity(f"Error in batch processing: {str(e)}", level='error') dummy_result = { 'file': "<Unknown>", 'status': 'Error', 'error': str(e) } results.append(dummy_result) return results