Spaces:
Runtime error
Runtime error
| import yaml | |
| import json | |
| import markdown | |
| from fuzzywuzzy import fuzz, process | |
| from typing import Dict, List, Any | |
| import logging | |
| def parse_yaml(yaml_text: str) -> Dict[str, Any]: | |
| """Parse YAML text and return dictionary""" | |
| try: | |
| return yaml.safe_load(yaml_text) | |
| except yaml.YAMLError as e: | |
| logging.error(f"YAML parsing error: {str(e)}") | |
| raise e | |
| def fuzzy_search(query: str, data: Dict[str, Any], threshold: int = 60) -> List[Dict[str, Any]]: | |
| """Perform fuzzy search on dictionary data""" | |
| matches = [] | |
| if not isinstance(data, dict): | |
| return matches | |
| for key, value in data.items(): | |
| if isinstance(value, (str, int, float)): | |
| value_str = str(value) | |
| # Check fuzzy match for key | |
| key_score = fuzz.partial_ratio(query.lower(), key.lower()) | |
| if key_score >= threshold: | |
| matches.append({ | |
| 'type': 'key', | |
| 'field': key, | |
| 'value': value_str, | |
| 'score': key_score | |
| }) | |
| # Check fuzzy match for value | |
| value_score = fuzz.partial_ratio(query.lower(), value_str.lower()) | |
| if value_score >= threshold: | |
| matches.append({ | |
| 'type': 'value', | |
| 'field': key, | |
| 'value': value_str, | |
| 'score': value_score | |
| }) | |
| # Sort by score descending | |
| matches.sort(key=lambda x: x['score'], reverse=True) | |
| return matches | |
| def render_markdown(text: str) -> str: | |
| """Render markdown text to HTML with emoji support""" | |
| try: | |
| md = markdown.Markdown(extensions=['extra', 'codehilite']) | |
| html = md.convert(text) | |
| # Basic emoji support - convert common emoji codes | |
| emoji_map = { | |
| ':smile:': 'π', | |
| ':heart:': 'β€οΈ', | |
| ':thumbsup:': 'π', | |
| ':thumbsdown:': 'π', | |
| ':fire:': 'π₯', | |
| ':rocket:': 'π', | |
| ':star:': 'β', | |
| ':check:': 'β ', | |
| ':x:': 'β', | |
| ':warning:': 'β οΈ', | |
| ':info:': 'βΉοΈ', | |
| ':bulb:': 'π‘', | |
| ':tada:': 'π' | |
| } | |
| for code, emoji in emoji_map.items(): | |
| html = html.replace(code, emoji) | |
| return html | |
| except Exception as e: | |
| logging.error(f"Markdown rendering error: {str(e)}") | |
| return text | |
| def create_dynamic_table(table_name: str, schema: Dict[str, Any]) -> bool: | |
| """Create a dynamic table based on schema (for future implementation)""" | |
| # This function can be expanded to create actual database tables | |
| # For now, we use the generic DataRecord model with JSON storage | |
| try: | |
| logging.info(f"Creating dynamic table: {table_name} with schema: {schema}") | |
| return True | |
| except Exception as e: | |
| logging.error(f"Error creating dynamic table: {str(e)}") | |
| return False | |
| def validate_schema(schema: Dict[str, Any]) -> bool: | |
| """Validate table schema format""" | |
| if not isinstance(schema, dict): | |
| return False | |
| if 'fields' not in schema: | |
| return False | |
| if not isinstance(schema['fields'], list): | |
| return False | |
| for field in schema['fields']: | |
| if not isinstance(field, dict): | |
| return False | |
| if 'name' not in field or 'type' not in field: | |
| return False | |
| return True | |
| def process_pipeline_data(pipeline_config: Dict[str, Any], source_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """Process data through a pipeline configuration""" | |
| processed_data = source_data.copy() | |
| try: | |
| # Apply transformations based on pipeline config | |
| transformations = pipeline_config.get('transformations', []) | |
| for transformation in transformations: | |
| transform_type = transformation.get('type') | |
| if transform_type == 'filter': | |
| condition = transformation.get('condition') | |
| processed_data = [item for item in processed_data if eval_condition(item, condition)] | |
| elif transform_type == 'map': | |
| mapping = transformation.get('mapping') | |
| for item in processed_data: | |
| apply_mapping(item, mapping) | |
| elif transform_type == 'sort': | |
| field = transformation.get('field') | |
| reverse = transformation.get('reverse', False) | |
| processed_data.sort(key=lambda x: x.get(field, ''), reverse=reverse) | |
| return processed_data | |
| except Exception as e: | |
| logging.error(f"Pipeline processing error: {str(e)}") | |
| return source_data | |
| def eval_condition(data: Dict[str, Any], condition: Dict[str, Any]) -> bool: | |
| """Evaluate a condition against data""" | |
| try: | |
| field = condition.get('field') | |
| operator = condition.get('operator') | |
| value = condition.get('value') | |
| if not field or not operator: | |
| return True | |
| data_value = data.get(field) | |
| if operator == 'equals': | |
| return data_value == value | |
| elif operator == 'contains': | |
| if data_value is None or value is None: | |
| return False | |
| return str(value).lower() in str(data_value).lower() | |
| elif operator == 'gt': | |
| try: | |
| return float(data_value or 0) > float(value or 0) | |
| except (ValueError, TypeError): | |
| return False | |
| elif operator == 'lt': | |
| try: | |
| return float(data_value or 0) < float(value or 0) | |
| except (ValueError, TypeError): | |
| return False | |
| return True | |
| except Exception: | |
| return True | |
| def apply_mapping(data: Dict[str, Any], mapping: Dict[str, str]) -> None: | |
| """Apply field mapping to data""" | |
| try: | |
| for old_field, new_field in mapping.items(): | |
| if old_field in data: | |
| data[new_field] = data.pop(old_field) | |
| except Exception as e: | |
| logging.error(f"Mapping error: {str(e)}") | |