Spaces:
Sleeping
Sleeping
| # utils.py | |
| import os, json, io | |
| from pathlib import Path | |
| from datetime import datetime | |
| from PIL import Image | |
| import exifread | |
| import folium | |
| from folium import plugins | |
| from config import HISTORY_FILE | |
| # Global debug messages for Gradio | |
| debug_messages = [] | |
| def add_debug(msg): | |
| """Add debug message that can be shown in Gradio""" | |
| global debug_messages | |
| debug_messages.append(msg) | |
| print(msg) # Also print to console/logs | |
| def get_debug_messages(): | |
| """Get and clear debug messages""" | |
| global debug_messages | |
| msgs = "\n".join(debug_messages) | |
| debug_messages = [] | |
| return msgs | |
| # ---------------- GPS Extraction ---------------- | |
| def extract_gps_from_image(image): | |
| """Extract GPS info from PIL image or file path using exifread""" | |
| try: | |
| add_debug("🔍 Starting GPS extraction...") | |
| # Case 1: If path given | |
| if isinstance(image, (str, bytes, os.PathLike)): | |
| with open(image, "rb") as f: | |
| tags = exifread.process_file(f, details=False) | |
| # Case 2: If PIL Image | |
| elif isinstance(image, Image.Image): | |
| import io | |
| buf = io.BytesIO() | |
| # Save with EXIF data if available | |
| if "exif" in image.info: | |
| image.save(buf, format="JPEG", exif=image.info["exif"]) | |
| else: | |
| image.save(buf, format="JPEG") | |
| buf.seek(0) | |
| tags = exifread.process_file(buf, details=False) | |
| else: | |
| add_debug(f"⚠ Unsupported type: {type(image)}") | |
| return {"coords": None, "address": None} | |
| # Debug: show GPS tags | |
| gps_tags = {k: str(v) for k, v in tags.items() if "GPS" in str(k)} | |
| if gps_tags: | |
| add_debug(f"📍 Found {len(gps_tags)} GPS tags") | |
| for key in list(gps_tags.keys())[:3]: | |
| add_debug(f" • {key}") | |
| else: | |
| add_debug("❌ No GPS tags found") | |
| return {"coords": None, "address": None} | |
| # Extract coordinates | |
| lat = _to_decimal(tags.get("GPS GPSLatitude"), tags.get("GPS GPSLatitudeRef")) | |
| lon = _to_decimal(tags.get("GPS GPSLongitude"), tags.get("GPS GPSLongitudeRef")) | |
| if lat is not None and lon is not None: | |
| add_debug(f"✅ GPS converted: {lat:.6f}, {lon:.6f}") | |
| add_debug(f"🗺️ https://maps.google.com/?q={lat},{lon}") | |
| return {"coords": (lat, lon), "address": None} | |
| else: | |
| add_debug("⚠ Could not convert GPS to decimal") | |
| except Exception as e: | |
| add_debug(f"❌ GPS extraction error: {e}") | |
| return {"coords": None, "address": None} | |
| def _to_decimal(dms_data, ref): | |
| """Convert DMS coordinates to decimal degrees""" | |
| if not dms_data: | |
| return None | |
| try: | |
| dms = [] | |
| # exifread IfdTag has .values | |
| values = getattr(dms_data, "values", dms_data) | |
| for val in values: | |
| if hasattr(val, "num") and hasattr(val, "den"): # Ratio | |
| dms.append(float(val.num) / float(val.den) if val.den != 0 else 0) | |
| elif isinstance(val, str) and "/" in val: # "136974/3125" | |
| num, den = val.split("/") | |
| den = float(den) if float(den) != 0 else 1 | |
| dms.append(float(num) / den) | |
| else: | |
| dms.append(float(val)) | |
| if len(dms) < 2: | |
| return None | |
| degrees, minutes = dms[0], dms[1] | |
| seconds = dms[2] if len(dms) > 2 else 0 | |
| decimal = degrees + (minutes / 60.0) + (seconds / 3600.0) | |
| if ref and str(ref).upper() in ["S", "W"]: | |
| decimal = -decimal | |
| return decimal | |
| except Exception as e: | |
| add_debug(f"⚠ Coordinate conversion error: {e}") | |
| return None | |
| # ---------------- History ---------------- | |
| def save_detection_to_history(detection): | |
| """Save detection to persistent history file""" | |
| try: | |
| Path("data").mkdir(exist_ok=True) | |
| history = [] | |
| if os.path.exists(HISTORY_FILE): | |
| try: | |
| with open(HISTORY_FILE, "r") as f: | |
| history = json.load(f) | |
| except json.JSONDecodeError: | |
| add_debug("⚠ History file corrupted, starting fresh") | |
| history = [] | |
| # Ensure GPS data is serializable | |
| if detection.get("gps") and detection["gps"].get("coords"): | |
| coords = detection["gps"]["coords"] | |
| # Convert tuple to list for JSON | |
| detection["gps"]["coords"] = [coords[0], coords[1]] | |
| history.append(detection) | |
| # Keep only last 100 detections | |
| if len(history) > 100: | |
| history = history[-100:] | |
| with open(HISTORY_FILE, "w") as f: | |
| json.dump(history, f, indent=2) | |
| add_debug(f"💾 Saved detection #{len(history)} to history") | |
| except Exception as e: | |
| add_debug(f"❌ Failed to save history: {str(e)}") | |
| def load_detection_history(): | |
| """Load detection history, converting GPS coords back to tuples""" | |
| try: | |
| if os.path.exists(HISTORY_FILE): | |
| with open(HISTORY_FILE, "r") as f: | |
| history = json.load(f) | |
| # Convert GPS coords from list back to tuple | |
| for detection in history: | |
| if detection.get("gps") and detection["gps"].get("coords"): | |
| coords = detection["gps"]["coords"] | |
| if isinstance(coords, list) and len(coords) == 2: | |
| detection["gps"]["coords"] = tuple(coords) | |
| return history | |
| except Exception as e: | |
| add_debug(f"⚠ Failed to load history: {str(e)}") | |
| return [] | |
| # ---------------- Map ---------------- | |
| def create_detection_map(detections): | |
| """Create Folium map with detection markers""" | |
| if not detections: | |
| return "<div style='padding:20px; text-align:center; color:#666;'>🗺️ No GPS data available yet<br>Upload images with location data to see them on the map</div>" | |
| # Find detections with GPS | |
| gps_detections = [d for d in detections if d.get("gps") and d["gps"].get("coords")] | |
| if not gps_detections: | |
| return "<div style='padding:20px; text-align:center; color:#666;'>📍 No detections with GPS coordinates<br>Make sure GPS is enabled when taking photos</div>" | |
| # Get the last GPS coordinate for center | |
| last_coords = gps_detections[-1]["gps"]["coords"] | |
| # Create map centered on last detection | |
| m = folium.Map( | |
| location=last_coords, | |
| zoom_start=13, | |
| tiles='OpenStreetMap' | |
| ) | |
| # Add fullscreen button | |
| plugins.Fullscreen( | |
| position='topright', | |
| title='Fullscreen', | |
| title_cancel='Exit fullscreen', | |
| force_separate_button=True | |
| ).add_to(m) | |
| # Add markers for each detection | |
| for idx, d in enumerate(gps_detections): | |
| coords = d["gps"]["coords"] | |
| lat, lon = coords | |
| # Determine marker color based on severity | |
| severity = d.get("severity", "LOW") | |
| color_map = { | |
| "HIGH": "red", | |
| "MEDIUM": "orange", | |
| "LOW": "yellow", | |
| "NONE": "gray" | |
| } | |
| color = color_map.get(severity, "blue") | |
| # Create detailed popup | |
| popup_html = f""" | |
| <div style='width: 200px'> | |
| <h4 style='margin: 5px 0'>Detection #{idx + 1}</h4> | |
| <b>Severity:</b> {severity}<br> | |
| <b>Items:</b> {d.get('count', 0)}<br> | |
| <b>Time:</b> {d.get('timestamp', 'Unknown')}<br> | |
| <b>GPS:</b> {lat:.6f}, {lon:.6f}<br> | |
| <a href='https://maps.google.com/?q={lat},{lon}' target='_blank'>View on Google Maps</a> | |
| </div> | |
| """ | |
| # Add marker | |
| folium.Marker( | |
| location=[lat, lon], | |
| popup=folium.Popup(popup_html, max_width=250), | |
| tooltip=f"{severity} - {d.get('count', 0)} items", | |
| icon=folium.Icon(color=color, icon='trash' if severity != 'NONE' else 'info-sign') | |
| ).add_to(m) | |
| # Fit map to show all markers | |
| if len(gps_detections) > 1: | |
| sw = [min(d["gps"]["coords"][0] for d in gps_detections), | |
| min(d["gps"]["coords"][1] for d in gps_detections)] | |
| ne = [max(d["gps"]["coords"][0] for d in gps_detections), | |
| max(d["gps"]["coords"][1] for d in gps_detections)] | |
| m.fit_bounds([sw, ne]) | |
| return m._repr_html_() | |
| # ---------------- Stats ---------------- | |
| def generate_statistics(): | |
| """Generate statistics from detection history""" | |
| history = load_detection_history() | |
| if not history: | |
| return {"total": 0, "items": 0, "high": 0, "medium": 0, "low": 0, "with_gps": 0} | |
| stats = { | |
| "total": len(history), | |
| "items": sum(h.get("count", 0) for h in history), | |
| "high": sum(1 for h in history if h.get("severity") == "HIGH"), | |
| "medium": sum(1 for h in history if h.get("severity") == "MEDIUM"), | |
| "low": sum(1 for h in history if h.get("severity") == "LOW"), | |
| "with_gps": sum(1 for h in history if h.get("gps") and h["gps"].get("coords")) | |
| } | |
| return stats | |
| def format_statistics_text(stats): | |
| """Format statistics for display""" | |
| if stats["total"] == 0: | |
| return "📊 **No detections yet**\n\nStart by uploading an image with the Detect tab" | |
| gps_percent = (stats["with_gps"] / stats["total"] * 100) if stats["total"] > 0 else 0 | |
| return f"""📊 **Detection Statistics** | |
| **Total Reports:** {stats['total']} | |
| **Total Items Detected:** {stats['items']} | |
| **Reports with GPS:** {stats['with_gps']} ({gps_percent:.0f}%) | |
| **Severity Breakdown:** | |
| 🔴 High: {stats['high']} reports | |
| 🟠 Medium: {stats['medium']} reports | |
| 🟡 Low: {stats['low']} reports | |
| **Average items per report:** {stats['items']/stats['total']:.1f} | |
| """ | |
| import exifread | |
| def dump_exif_tags(image_path): | |
| """Return all EXIF tags from the image as a formatted string for debugging""" | |
| tags_text = [] | |
| try: | |
| with open(image_path, "rb") as f: | |
| tags = exifread.process_file(f, details=False) | |
| if not tags: | |
| return "⚠ No EXIF data found." | |
| for tag, value in tags.items(): | |
| tags_text.append(f"{tag}: {value}") | |
| except Exception as e: | |
| return f"❌ Failed to read EXIF: {e}" | |
| return "\n".join(tags_text) | |