content_moderation_demo / video_utils.py
onullusoy's picture
Upload 12 files
5e94db5 verified
raw
history blame
2.55 kB
import os, shutil, subprocess
def has_ffmpeg() -> bool:
return shutil.which("ffmpeg") is not None and shutil.which("ffprobe") is not None
def probe_duration(video_path: str) -> float | None:
if not shutil.which("ffprobe"):
return None
try:
out = subprocess.check_output(
["ffprobe","-v","error","-select_streams","v:0","-show_entries","stream=duration","-of","default=nw=1:nk=1",video_path],
stderr=subprocess.STDOUT, text=True
)
return float(out.strip())
except Exception:
return None
def extract_frames_ffmpeg(video_path: str, fps: float, out_dir: str) -> list[tuple[str, int]]:
os.makedirs(out_dir, exist_ok=True)
tpl = os.path.join(out_dir, "frame_%06d.jpg")
subprocess.check_call(
["ffmpeg","-y","-i",video_path,"-vf",f"fps={fps}","-qscale:v","2",tpl],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
frames = sorted([os.path.join(out_dir,f) for f in os.listdir(out_dir) if f.lower().endswith(".jpg")])
return [(p, i) for i, p in enumerate(frames)]
def runs_from_indices(idxs: list[int]) -> list[tuple[int,int]]:
if not idxs: return []
idxs = sorted(idxs)
runs, s, prev = [], idxs[0], idxs[0]
for x in idxs[1:]:
if x == prev + 1:
prev = x
else:
runs.append((s, prev)); s = prev = x
runs.append((s, prev))
return runs
def merge_seconds_union(all_indices: list[int], fps: float, pad: float = 0.25) -> list[tuple[float,float]]:
if not all_indices: return []
runs = runs_from_indices(sorted(all_indices))
intervals = []
for a, b in runs:
start = max(0.0, a / fps - pad)
end = (b + 1) / fps + pad
intervals.append((start, end))
merged = []
for s, e in sorted(intervals):
if not merged or s > merged[-1][1]:
merged.append((s, e))
else:
merged[-1] = (merged[-1][0], max(merged[-1][1], e))
return merged
def redact_with_ffmpeg(video_path: str, intervals: list[tuple[float,float]], out_path: str):
if not intervals:
shutil.copyfile(video_path, out_path); return
parts = [f"between(t\\,{s:.3f}\\,{e:.3f})" for s, e in intervals]
expr = f"not({' + '.join(parts)})"
vf = f"select='{expr}',setpts=N/FRAME_RATE/TB"
subprocess.check_call(["ffmpeg","-y","-i",video_path,"-vf",vf,"-an",out_path],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)