Spaces:
Running
Running
| import os, shutil, subprocess | |
| def has_ffmpeg() -> bool: | |
| return shutil.which("ffmpeg") is not None and shutil.which("ffprobe") is not None | |
| def probe_duration(video_path: str) -> float | None: | |
| if not shutil.which("ffprobe"): | |
| return None | |
| try: | |
| out = subprocess.check_output( | |
| ["ffprobe","-v","error","-select_streams","v:0","-show_entries","stream=duration","-of","default=nw=1:nk=1",video_path], | |
| stderr=subprocess.STDOUT, text=True | |
| ) | |
| return float(out.strip()) | |
| except Exception: | |
| return None | |
| def extract_frames_ffmpeg(video_path: str, fps: float, out_dir: str) -> list[tuple[str, int]]: | |
| os.makedirs(out_dir, exist_ok=True) | |
| tpl = os.path.join(out_dir, "frame_%06d.jpg") | |
| subprocess.check_call( | |
| ["ffmpeg","-y","-i",video_path,"-vf",f"fps={fps}","-qscale:v","2",tpl], | |
| stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL | |
| ) | |
| frames = sorted([os.path.join(out_dir,f) for f in os.listdir(out_dir) if f.lower().endswith(".jpg")]) | |
| return [(p, i) for i, p in enumerate(frames)] | |
| def runs_from_indices(idxs: list[int]) -> list[tuple[int,int]]: | |
| if not idxs: return [] | |
| idxs = sorted(idxs) | |
| runs, s, prev = [], idxs[0], idxs[0] | |
| for x in idxs[1:]: | |
| if x == prev + 1: | |
| prev = x | |
| else: | |
| runs.append((s, prev)); s = prev = x | |
| runs.append((s, prev)) | |
| return runs | |
| def merge_seconds_union(all_indices: list[int], fps: float, pad: float = 0.25) -> list[tuple[float,float]]: | |
| if not all_indices: return [] | |
| runs = runs_from_indices(sorted(all_indices)) | |
| intervals = [] | |
| for a, b in runs: | |
| start = max(0.0, a / fps - pad) | |
| end = (b + 1) / fps + pad | |
| intervals.append((start, end)) | |
| merged = [] | |
| for s, e in sorted(intervals): | |
| if not merged or s > merged[-1][1]: | |
| merged.append((s, e)) | |
| else: | |
| merged[-1] = (merged[-1][0], max(merged[-1][1], e)) | |
| return merged | |
| def redact_with_ffmpeg(video_path: str, intervals: list[tuple[float,float]], out_path: str): | |
| if not intervals: | |
| shutil.copyfile(video_path, out_path); return | |
| parts = [f"between(t\\,{s:.3f}\\,{e:.3f})" for s, e in intervals] | |
| expr = f"not({' + '.join(parts)})" | |
| vf = f"select='{expr}',setpts=N/FRAME_RATE/TB" | |
| subprocess.check_call(["ffmpeg","-y","-i",video_path,"-vf",vf,"-an",out_path], | |
| stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |