# -*- coding: utf-8 -*- """Strict-Track-Sentence-and-Doc-Bassline-Readability-Arabertv2-d3tok-reg.ipynb Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/1-5DN0z-QR-4uCo715FrzhQL25z67KMgP """ import pandas as pd import numpy as np import os import torch import zipfile from sklearn.metrics import cohen_kappa_score from torch.utils.data import Dataset as TorchDataset from transformers import ( AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer, EarlyStoppingCallback ) from camel_tools.disambig.mle import MLEDisambiguator MODEL_NAME = "aubmindlab/bert-base-arabertv2" NUM_LABELS = 1 TARGET_CLASSES = 19 # --- MODIFICATION: Paths are now relative to the script's location --- # Assumes 'data', 'results', and 'submission' folders are in the same directory as the script. BASE_DIR = "." DATA_DIR = os.path.join(BASE_DIR, "data") CHECKPOINT_DIR = os.path.join(BASE_DIR, "results", f"regression_{MODEL_NAME.split('/')[-1]}") SUBMISSION_DIR = os.path.join(BASE_DIR, "submission") # Create output directories if they don't exist os.makedirs(CHECKPOINT_DIR, exist_ok=True) os.makedirs(SUBMISSION_DIR, exist_ok=True) # File Paths BAREC_TRAIN_PATH = os.path.join(DATA_DIR, 'train.csv') BAREC_DEV_PATH = os.path.join(DATA_DIR, 'dev.csv') BLIND_TEST_PATH = os.path.join(DATA_DIR, 'blind_test_data.csv') SUBMISSION_PATH = os.path.join(SUBMISSION_DIR, "submission_regression_final.csv") ZIPPED_SUBMISSION_PATH = os.path.join(SUBMISSION_DIR, "submission_regression_final.zip") # --- Paths for preprocessed (cached) files --- TRAIN_PREPROCESSED_PATH = os.path.join(DATA_DIR, 'train_preprocessed.csv') DEV_PREPROCESSED_PATH = os.path.join(DATA_DIR, 'dev_preprocessed.csv') # =================================================================== # TEMPORARY DEBUGGING CODE # =================================================================== def debug_camel_object(disambiguator): print("--- STARTING DEBUG ---") # A simple Arabic sentence to test with sample_text = "جاء الولد" disambiguated_words = disambiguator.disambiguate(sample_text.split()) if not disambiguated_words or not disambiguated_words[0].analyses: print("Could not get analysis for the sample text.") return # Get the top analysis object for the first word, which is causing the error top_analysis = disambiguated_words[0].analyses[0] print("\n--- DEBUGGING 'ScoredAnalysis' OBJECT ---") print(f"Object Type: {type(top_analysis)}") print(f"Object Itself: {top_analysis}") print("\n--- Available Attributes (from dir()) ---") # This will list every possible attribute and method for the object print(dir(top_analysis)) print("\n--- END OF DEBUG ---") # We stop the program here because we only need this output exit() # This line calls the debugging function debug_camel_object(mle_disambiguator) # ===================================================================================== # 3. DATA LOADING AND PREPROCESSING (with Caching) # ===================================================================================== # --- Helper function for preprocessing (must be defined before it is used) --- def preprocess_d3tok(text, disambiguator): """ Preprocesses text into the D3Tok format as described in the BAREC paper. This involves segmenting words into their base and clitic forms. """ if not isinstance(text, str) or not text.strip(): return "" disambiguated_words = disambiguator.disambiguate(text.split()) d3tok_forms = [] for word in disambiguated_words: # Check if the list of analyses is NOT empty before accessing it if word.analyses: # If analyses exist, get the d3tok from the top one d3tok_forms.append(word.analyses[0].analysis['d3tok']) else: # If no analysis is found, fall back to the original word d3tok_forms.append(word.word) return ' '.join(d3tok_forms) # --- Main function to load or create preprocessed data --- def load_or_preprocess_data(): """ Loads preprocessed data if it exists, otherwise, it runs the preprocessing and saves the results for future use. """ print("--- Loading BAREC Data ---") # Check if preprocessed files already exist if os.path.exists(TRAIN_PREPROCESSED_PATH) and os.path.exists(DEV_PREPROCESSED_PATH): print("✅ Found preprocessed files. Loading them directly...") train_df = pd.read_csv(TRAIN_PREPROCESSED_PATH) val_df = pd.read_csv(DEV_PREPROCESSED_PATH) # Ensure the 'text' column is treated as a string, even if it contains numbers train_df['text'] = train_df['text'].astype(str) val_df['text'] = val_df['text'].astype(str) print(f"Successfully loaded {len(train_df)} training and {len(val_df)} validation records.") return train_df, val_df # If they don't exist, run the original loading and preprocessing else: print("Preprocessed files not found. Starting one-time preprocessing...") try: # Load raw data train_df = pd.read_csv(BAREC_TRAIN_PATH) val_df = pd.read_csv(BAREC_DEV_PATH) # Rename columns train_df = train_df[['Sentence', 'Readability_Level_19']].rename(columns={'Sentence': 'text', 'Readability_Level_19': 'label'}) val_df = val_df[['Sentence', 'Readability_Level_19']].rename(columns={'Sentence': 'text', 'Readability_Level_19': 'label'}) # Drop NA and format labels train_df.dropna(subset=['text', 'label'], inplace=True) val_df.dropna(subset=['label', 'text'], inplace=True) # Ensure 'text' column is string train_df['text'] = train_df['text'].astype(str) val_df['text'] = val_df['text'].astype(str) train_df['label'] = train_df['label'].astype(int) - 1 val_df['label'] = val_df['label'].astype(int) - 1 train_df['label'] = train_df['label'].astype(float) val_df['label'] = val_df['label'].astype(float) print(f"Successfully loaded raw data: {len(train_df)} training and {len(val_df)} validation records.") # --- Run the time-consuming preprocessing --- print("\n--- Preprocessing Text to D3Tok format (this will only run once) ---") mle_disambiguator = MLEDisambiguator.pretrained() train_df['text'] = train_df['text'].apply(lambda x: preprocess_d3tok(x, mle_disambiguator)) val_df['text'] = val_df['text'].apply(lambda x: preprocess_d3tok(x, mle_disambiguator)) print("Text preprocessing finished.") # --- Save the newly preprocessed files for next time --- print("\n--- Saving preprocessed data for future use... ---") train_df.to_csv(TRAIN_PREPROCESSED_PATH, index=False) val_df.to_csv(DEV_PREPROCESSED_PATH, index=False) print(f"Saved preprocessed files to '{TRAIN_PREPROCESSED_PATH}' and '{DEV_PREPROCESSED_PATH}'") return train_df, val_df except FileNotFoundError: print(f"❗️ ERROR: Raw data file not found. Make sure 'train.csv' and 'dev.csv' are in the '{DATA_DIR}' directory.") return None, None except Exception as e: print(f"❗️ ERROR during initial processing: {e}") return None, None # --- Execute Data Loading and Preprocessing --- # This single line now handles everything train_df, val_df = load_or_preprocess_data() # Initialize the tokenizer, which is needed for the Dataset class later if train_df is not None: tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) else: exit() # ===================================================================================== # 4. DATASET AND METRICS # ===================================================================================== class ReadabilityDataset(TorchDataset): def __init__(self, texts, labels=None): self.encodings = tokenizer(texts, truncation=True, padding="max_length", max_length=256) self.labels = labels def __getitem__(self, idx): item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()} if self.labels is not None: item['labels'] = torch.tensor(self.labels[idx], dtype=torch.float) return item def __len__(self): return len(self.encodings.get('input_ids', [])) def compute_metrics(p): """ Computes metrics for the regression task. Predictions are rounded to the nearest integer and clipped to the valid label range. """ preds = p.predictions.flatten() rounded_preds = np.round(preds) clipped_preds = np.clip(rounded_preds, 0, TARGET_CLASSES - 1).astype(int) labels = p.label_ids.astype(int) qwk = cohen_kappa_score(labels, clipped_preds, weights='quadratic') return {"qwk": qwk} # ===================================================================================== # 5. MODEL TRAINING # ===================================================================================== print("\n===== ✨ INITIALIZING REGRESSION MODEL AND TRAINER =====\n") model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=NUM_LABELS) train_dataset = ReadabilityDataset(train_df['text'].tolist(), train_df['label'].tolist()) val_dataset = ReadabilityDataset(val_df['text'].tolist(), val_df['label'].tolist()) training_args = TrainingArguments( output_dir=CHECKPOINT_DIR, num_train_epochs=6, per_device_train_batch_size=16, # Adjusted for local GPU memory; increase if you have more VRAM per_device_eval_batch_size=32, # Adjusted for local GPU memory learning_rate=5e-5, warmup_ratio=0.1, weight_decay=0.01, logging_steps=100, eval_strategy="epoch", save_strategy="epoch", load_best_model_at_end=True, metric_for_best_model="qwk", greater_is_better=True, save_total_limit=2, fp16=torch.cuda.is_available(), # Automatically use mixed precision if a CUDA GPU is available report_to="none" ) trainer = Trainer( model=model, args=training_args, train_dataset=train_dataset, eval_dataset=val_dataset, compute_metrics=compute_metrics, callbacks=[EarlyStoppingCallback(early_stopping_patience=2)] ) print("Starting training...") trainer.train() print("✅ Training finished.") # ===================================================================================== # 6. FINAL PREDICTION AND SUBMISSION # ===================================================================================== print("\n===== 🏆 FINAL PREDICTION AND SUBMISSION =====\n") try: # NOTE: This section assumes a sentence-level prediction task as per the competition's format. test_df = pd.read_csv(BLIND_TEST_PATH) test_df.dropna(subset=['Sentence'], inplace=True) print("Preprocessing blind test text to D3Tok format...") test_df['processed_text'] = test_df['Sentence'].apply(lambda x: preprocess_d3tok(x, mle_disambiguator)) print("Generating predictions on the test set...") test_dataset = ReadabilityDataset(test_df['processed_text'].tolist()) predictions = trainer.predict(test_dataset) raw_preds = predictions.predictions.flatten() rounded_preds = np.round(raw_preds) clipped_preds = np.clip(rounded_preds, 0, TARGET_CLASSES - 1) # Add 1 to get back to the 1-19 scale test_df['Prediction'] = (clipped_preds + 1).astype(int) # Create the submission DataFrame in the correct format submission_df = test_df[['Sentence ID', 'Prediction']] print(f"Saving prediction file to: {SUBMISSION_PATH}") submission_df.to_csv(SUBMISSION_PATH, index=False) print(f"\nCompressing '{os.path.basename(SUBMISSION_PATH)}' into '{os.path.basename(ZIPPED_SUBMISSION_PATH)}'...") with zipfile.ZipFile(ZIPPED_SUBMISSION_PATH, 'w', zipfile.ZIP_DEFLATED) as zipf: zipf.write(SUBMISSION_PATH, arcname=os.path.basename(SUBMISSION_PATH)) print(f"Submission file '{os.path.basename(ZIPPED_SUBMISSION_PATH)}' created successfully.") except FileNotFoundError: print(f"❗️ ERROR: Test file not found. Make sure 'blind_test_data.csv' is in the '{DATA_DIR}' directory.") except Exception as e: print(f"An error occurred during final prediction: {e}") print("\n--- Script Finished ---")