|
|
import gradio as gr |
|
|
import requests |
|
|
import os |
|
|
import pandas as pd |
|
|
|
|
|
|
|
|
API_URL = "https://router.huggingface.co/models/deepseek-ai/DeepSeek-V2" |
|
|
headers = {"Authorization": f"Bearer {os.environ['HF_API_TOKEN']}"} |
|
|
|
|
|
def preprocess_csv(file_path): |
|
|
df = pd.read_csv(file_path) |
|
|
events = [] |
|
|
for _, row in df.iterrows(): |
|
|
events.append(f"On {row['Date']} at {row['Time']}, state was {row['State']} → {row['Message Text']}") |
|
|
return "\n".join(events) |
|
|
|
|
|
def analyze_log(file_path, mode): |
|
|
text = preprocess_csv(file_path) |
|
|
|
|
|
|
|
|
if mode == "Summarize": |
|
|
prompt = "Summarize the following AHU alarm log:\n\n" + text |
|
|
elif mode == "Highlight anomalies": |
|
|
prompt = "Identify unusual or repeated alarms in this AHU log and explain possible causes:\n\n" + text |
|
|
elif mode == "Suggest maintenance": |
|
|
prompt = "Based on this AHU alarm log, suggest maintenance actions:\n\n" + text |
|
|
else: |
|
|
prompt = text |
|
|
|
|
|
|
|
|
response = requests.post(API_URL, headers=headers, json={"inputs": prompt}) |
|
|
|
|
|
if response.status_code != 200: |
|
|
return f"Error {response.status_code}: {response.text}" |
|
|
|
|
|
try: |
|
|
result = response.json() |
|
|
except Exception: |
|
|
return f"Failed to decode JSON. Raw response: {response.text}" |
|
|
|
|
|
|
|
|
if isinstance(result, list) and "generated_text" in result[0]: |
|
|
return result[0]["generated_text"] |
|
|
elif isinstance(result, dict) and "generated_text" in result: |
|
|
return result["generated_text"] |
|
|
else: |
|
|
return str(result) |
|
|
|
|
|
iface = gr.Interface( |
|
|
fn=analyze_log, |
|
|
inputs=[ |
|
|
gr.File(type="filepath", label="Upload Log File"), |
|
|
gr.Dropdown(choices=["Summarize", "Highlight anomalies", "Suggest maintenance"], label="Analysis Mode") |
|
|
], |
|
|
outputs="text", |
|
|
title="AHU Log Analyzer (DeepSeek API)", |
|
|
description="Upload your log file (CSV) and choose how you want it analyzed using DeepSeek via Hugging Face API." |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
iface.launch() |
|
|
|