mr-kush commited on
Commit
58bdb4f
·
1 Parent(s): 1412806

implement dataset preparation pipeline with environment variable validation and database connection handling

Browse files
Files changed (1) hide show
  1. prepare_dataset_pipeline.py +111 -0
prepare_dataset_pipeline.py ADDED
@@ -0,0 +1,111 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from preprocess_and_prepare_dataset import preprocess_and_push_dataset
3
+ from prepare_pd_df import fetch_misclassified_dataframe
4
+ from dotenv import load_dotenv
5
+ from sqlalchemy import create_engine
6
+ import time
7
+ from sqlalchemy.exc import SQLAlchemyError
8
+ from huggingface_hub import HfApi
9
+
10
+ load_dotenv()
11
+
12
+
13
+ def prepare_datasets():
14
+ """
15
+ Fetch misclassified data and preprocess & push datasets for department and urgency.
16
+ Uses environment variables:
17
+ - HF_TOKEN
18
+ - DEPARTMENT_DATASET
19
+ - URGENCY_DATASET
20
+ - DB_URL
21
+ - PREPARE_DATASET_SPACE_ID
22
+ """
23
+ # Load configuration from environment variable
24
+
25
+ hf_token = os.getenv("HF_TOKEN", None)
26
+ dept_dataset_dir = os.getenv("DEPARTMENT_DATASET", None)
27
+ urgency_dataset_dir = os.getenv("URGENCY_DATASET", None)
28
+ DB_URL = os.getenv("POSTGRES_URL", None)
29
+ PREPARE_DATASET_SPACE_ID = os.getenv('PREPARE_DATASET_SPACE_ID', None)
30
+
31
+ # chekcing the envrionment var's
32
+ if not DB_URL:
33
+ raise EnvironmentError(f"Environment variable POSTGRES_URL must be set: {DB_URL} ")
34
+ if not hf_token:
35
+ raise ValueError(f"HF_TOKEN environment variable is not set:{hf_token}")
36
+ if not dept_dataset_dir:
37
+ raise ValueError(f"DEPARTMENT_DATASET environment variable is not set: {dept_dataset_dir}")
38
+ if not urgency_dataset_dir:
39
+ raise ValueError(f"URGENCY_DATASET environment variable is not set: {urgency_dataset_dir}")
40
+
41
+
42
+
43
+ try:
44
+
45
+ # create engine with a pre-ping to avoid stale connections
46
+ engine = create_engine(DB_URL, pool_pre_ping=True)
47
+
48
+ # validate connection with simple query and a small retry/backoff strategy
49
+ max_attempts = 3
50
+ for attempt in range(1, max_attempts + 1):
51
+ try:
52
+ with engine.connect() as conn:
53
+ conn.exec_driver_sql("SELECT 1")
54
+ break
55
+ except SQLAlchemyError as e:
56
+ if attempt >= max_attempts:
57
+ raise RuntimeError(f"Unable to connect to DB after {max_attempts} attempts: {e}")
58
+ wait = 2 ** attempt
59
+ print(f"DB connection attempt {attempt} failed: {e}. Retrying in {wait}s...")
60
+ time.sleep(wait)
61
+
62
+ except Exception as e:
63
+ raise RuntimeError(f"Error setting up SQLAlchemy engine: {e}")
64
+
65
+
66
+
67
+ # Mapping label -> dataset directory
68
+ dataset_mapping = {
69
+ "department": dept_dataset_dir,
70
+ "urgency": urgency_dataset_dir
71
+ }
72
+
73
+ for label, dataset_dir in dataset_mapping.items():
74
+ try:
75
+ print(f"Fetching misclassified data for '{label}'...")
76
+ df = fetch_misclassified_dataframe(label_column=label,
77
+ engine=engine,
78
+ correct_ratio=0.5
79
+ )
80
+
81
+ print(f"Preprocessing and pushing dataset for '{label}' to '{dataset_dir}'...", flush=True)
82
+ preprocess_and_push_dataset(
83
+ df=df,
84
+ hf_token=hf_token,
85
+ hf_dataset_dir=dataset_dir,
86
+ label_column=label,
87
+ )
88
+
89
+ print(f"Successfully processe and pushed '{label}' dataset.\n")
90
+
91
+ except Exception as e:
92
+ raise RuntimeError(f" Error processing '{label}' dataset: {e}")
93
+
94
+ # pause the space if it was run in the hf_space
95
+ if PREPARE_DATASET_SPACE_ID:
96
+ try:
97
+ print(f"[{time.strftime('%H:%M:%S')}] Attempting to pause Hugging Face Space...", flush=True)
98
+
99
+ api = HfApi()
100
+
101
+ api.pause_space(repo_id=PREPARE_DATASET_SPACE_ID,
102
+ token=hf_token)
103
+
104
+ print(f"[{time.strftime('%H:%M:%S')}] Pause command executed.", flush=True)
105
+ except Exception as e:
106
+ print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] WARNING: Failed to pause HF Space: {e}", flush=True)
107
+
108
+
109
+ # Example usage
110
+ if __name__ == "__main__":
111
+ prepare_datasets()