Spaces:
Runtime error
Runtime error
| #### FACE_ENHANCER.PY CODE START ### | |
| import os | |
| import cv2 | |
| import torch | |
| import gfpgan | |
| from PIL import Image | |
| from upscaler.RealESRGAN import RealESRGAN | |
| from upscaler.codeformer import CodeFormerEnhancer | |
| def gfpgan_runner(img, model): | |
| _, imgs, _ = model.enhance(img, paste_back=True, has_aligned=True) | |
| return imgs[0] | |
| def realesrgan_runner(img, model): | |
| img = model.predict(img) | |
| return img | |
| def codeformer_runner(img, model): | |
| img = model.enhance(img) | |
| return img | |
| supported_enhancers = { | |
| "CodeFormer": ("./assets/pretrained_models/codeformer.onnx", codeformer_runner), | |
| "GFPGAN": ("./assets/pretrained_models/GFPGANv1.4.pth", gfpgan_runner), | |
| "REAL-ESRGAN 2x": ("./assets/pretrained_models/RealESRGAN_x2.pth", realesrgan_runner), | |
| "REAL-ESRGAN 4x": ("./assets/pretrained_models/RealESRGAN_x4.pth", realesrgan_runner), | |
| "REAL-ESRGAN 8x": ("./assets/pretrained_models/RealESRGAN_x8.pth", realesrgan_runner) | |
| } | |
| cv2_interpolations = ["LANCZOS4", "CUBIC", "NEAREST"] | |
| def get_available_enhancer_names(): | |
| available = [] | |
| for name, data in supported_enhancers.items(): | |
| path = os.path.join(os.path.abspath(os.path.dirname(__file__)), data[0]) | |
| if os.path.exists(path): | |
| available.append(name) | |
| return available | |
| def load_face_enhancer_model(name='GFPGAN', device="cpu"): | |
| assert name in get_available_enhancer_names() + cv2_interpolations, f"Face enhancer {name} unavailable." | |
| if name in supported_enhancers.keys(): | |
| model_path, model_runner = supported_enhancers.get(name) | |
| model_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), model_path) | |
| if name == 'CodeFormer': | |
| model = CodeFormerEnhancer(model_path=model_path, device=device) | |
| elif name == 'GFPGAN': | |
| model = gfpgan.GFPGANer(model_path=model_path, upscale=1, device=device) | |
| elif name == 'REAL-ESRGAN 2x': | |
| model = RealESRGAN(device, scale=2) | |
| model.load_weights(model_path, download=False) | |
| elif name == 'REAL-ESRGAN 4x': | |
| model = RealESRGAN(device, scale=4) | |
| model.load_weights(model_path, download=False) | |
| elif name == 'REAL-ESRGAN 8x': | |
| model = RealESRGAN(device, scale=8) | |
| model.load_weights(model_path, download=False) | |
| elif name == 'LANCZOS4': | |
| model = None | |
| model_runner = lambda img, _: cv2.resize(img, (512,512), interpolation=cv2.INTER_LANCZOS4) | |
| elif name == 'CUBIC': | |
| model = None | |
| model_runner = lambda img, _: cv2.resize(img, (512,512), interpolation=cv2.INTER_CUBIC) | |
| elif name == 'NEAREST': | |
| model = None | |
| model_runner = lambda img, _: cv2.resize(img, (512,512), interpolation=cv2.INTER_NEAREST) | |
| else: | |
| model = None | |
| return (model, model_runner) | |
| #### FACE_EHNANCER.PY CODE END ### | |
| #### FACE_SWAPPER.PY CODE START ### | |
| import time | |
| import torch | |
| import onnx | |
| import cv2 | |
| import onnxruntime | |
| import numpy as np | |
| from tqdm import tqdm | |
| import torch.nn as nn | |
| from onnx import numpy_helper | |
| from skimage import transform as trans | |
| import torchvision.transforms.functional as F | |
| import torch.nn.functional as F | |
| from utils import mask_crop, laplacian_blending | |
| arcface_dst = np.array( | |
| [[38.2946, 51.6963], [73.5318, 51.5014], [56.0252, 71.7366], | |
| [41.5493, 92.3655], [70.7299, 92.2041]], | |
| dtype=np.float32) | |
| def estimate_norm(lmk, image_size=112, mode='arcface'): | |
| assert lmk.shape == (5, 2) | |
| assert image_size % 112 == 0 or image_size % 128 == 0 | |
| if image_size % 112 == 0: | |
| ratio = float(image_size) / 112.0 | |
| diff_x = 0 | |
| else: | |
| ratio = float(image_size) / 128.0 | |
| diff_x = 8.0 * ratio | |
| dst = arcface_dst * ratio | |
| dst[:, 0] += diff_x | |
| tform = trans.SimilarityTransform() | |
| tform.estimate(lmk, dst) | |
| M = tform.params[0:2, :] | |
| return M | |
| def norm_crop2(img, landmark, image_size=112, mode='arcface'): | |
| M = estimate_norm(landmark, image_size, mode) | |
| warped = cv2.warpAffine(img, M, (image_size, image_size), borderValue=0.0) | |
| return warped, M | |
| class Inswapper(): | |
| def __init__(self, model_file=None, batch_size=32, providers=['CPUExecutionProvider']): | |
| self.model_file = model_file | |
| self.batch_size = batch_size | |
| model = onnx.load(self.model_file) | |
| graph = model.graph | |
| self.emap = numpy_helper.to_array(graph.initializer[-1]) | |
| self.session_options = onnxruntime.SessionOptions() | |
| self.session = onnxruntime.InferenceSession(self.model_file, sess_options=self.session_options, providers=providers) | |
| def forward(self, imgs, latents): | |
| preds = [] | |
| for img, latent in zip(imgs, latents): | |
| img = img / 255 | |
| pred = self.session.run(['output'], {'target': img, 'source': latent})[0] | |
| preds.append(pred) | |
| def get(self, imgs, target_faces, source_faces): | |
| imgs = list(imgs) | |
| preds = [None] * len(imgs) | |
| matrs = [None] * len(imgs) | |
| for idx, (img, target_face, source_face) in enumerate(zip(imgs, target_faces, source_faces)): | |
| matrix, blob, latent = self.prepare_data(img, target_face, source_face) | |
| pred = self.session.run(['output'], {'target': blob, 'source': latent})[0] | |
| pred = pred.transpose((0, 2, 3, 1))[0] | |
| pred = np.clip(255 * pred, 0, 255).astype(np.uint8)[:, :, ::-1] | |
| preds[idx] = pred | |
| matrs[idx] = matrix | |
| return (preds, matrs) | |
| def prepare_data(self, img, target_face, source_face): | |
| if isinstance(img, str): | |
| img = cv2.imread(img) | |
| aligned_img, matrix = norm_crop2(img, target_face.kps, 128) | |
| blob = cv2.dnn.blobFromImage(aligned_img, 1.0 / 255, (128, 128), (0., 0., 0.), swapRB=True) | |
| latent = source_face.normed_embedding.reshape((1, -1)) | |
| latent = np.dot(latent, self.emap) | |
| latent /= np.linalg.norm(latent) | |
| return (matrix, blob, latent) | |
| def batch_forward(self, img_list, target_f_list, source_f_list): | |
| num_samples = len(img_list) | |
| num_batches = (num_samples + self.batch_size - 1) // self.batch_size | |
| for i in tqdm(range(num_batches), desc="Generating face"): | |
| start_idx = i * self.batch_size | |
| end_idx = min((i + 1) * self.batch_size, num_samples) | |
| batch_img = img_list[start_idx:end_idx] | |
| batch_target_f = target_f_list[start_idx:end_idx] | |
| batch_source_f = source_f_list[start_idx:end_idx] | |
| batch_pred, batch_matr = self.get(batch_img, batch_target_f, batch_source_f) | |
| yield batch_pred, batch_matr | |
| def paste_to_whole(foreground, background, matrix, mask=None, crop_mask=(0,0,0,0), blur_amount=0.1, erode_amount = 0.15, blend_method='linear'): | |
| inv_matrix = cv2.invertAffineTransform(matrix) | |
| fg_shape = foreground.shape[:2] | |
| bg_shape = (background.shape[1], background.shape[0]) | |
| foreground = cv2.warpAffine(foreground, inv_matrix, bg_shape, borderValue=0.0) | |
| if mask is None: | |
| mask = np.full(fg_shape, 1., dtype=np.float32) | |
| mask = mask_crop(mask, crop_mask) | |
| mask = cv2.warpAffine(mask, inv_matrix, bg_shape, borderValue=0.0) | |
| else: | |
| assert fg_shape == mask.shape[:2], "foreground & mask shape mismatch!" | |
| mask = mask_crop(mask, crop_mask).astype('float32') | |
| mask = cv2.warpAffine(mask, inv_matrix, (background.shape[1], background.shape[0]), borderValue=0.0) | |
| _mask = mask.copy() | |
| _mask[_mask > 0.05] = 1. | |
| non_zero_points = cv2.findNonZero(_mask) | |
| _, _, w, h = cv2.boundingRect(non_zero_points) | |
| mask_size = int(np.sqrt(w * h)) | |
| if erode_amount > 0: | |
| kernel_size = max(int(mask_size * erode_amount), 1) | |
| structuring_element = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_size, kernel_size)) | |
| mask = cv2.erode(mask, structuring_element) | |
| if blur_amount > 0: | |
| kernel_size = max(int(mask_size * blur_amount), 3) | |
| if kernel_size % 2 == 0: | |
| kernel_size += 1 | |
| mask = cv2.GaussianBlur(mask, (kernel_size, kernel_size), 0) | |
| mask = np.tile(np.expand_dims(mask, axis=-1), (1, 1, 3)) | |
| if blend_method == 'laplacian': | |
| composite_image = laplacian_blending(foreground, background, mask.clip(0,1), num_levels=4) | |
| else: | |
| composite_image = mask * foreground + (1 - mask) * background | |
| return composite_image.astype("uint8").clip(0, 255) | |
| #### FACE_SWAPPER.PY CODE END ### | |
| #### FACE_ANALYSER.PY CODE START ### | |
| import os | |
| import cv2 | |
| import numpy as np | |
| from tqdm import tqdm | |
| from utils import scale_bbox_from_center | |
| detect_conditions = [ | |
| "best detection", | |
| "left most", | |
| "right most", | |
| "top most", | |
| "bottom most", | |
| "middle", | |
| "biggest", | |
| "smallest", | |
| ] | |
| swap_options_list = [ | |
| "All Face", | |
| "Specific Face", | |
| "Age less than", | |
| "Age greater than", | |
| "All Male", | |
| "All Female", | |
| "Left Most", | |
| "Right Most", | |
| "Top Most", | |
| "Bottom Most", | |
| "Middle", | |
| "Biggest", | |
| "Smallest", | |
| ] | |
| def get_single_face(faces, method="best detection"): | |
| total_faces = len(faces) | |
| if total_faces == 1: | |
| return faces[0] | |
| print(f"{total_faces} face detected. Using {method} face.") | |
| if method == "best detection": | |
| return sorted(faces, key=lambda face: face["det_score"])[-1] | |
| elif method == "left most": | |
| return sorted(faces, key=lambda face: face["bbox"][0])[0] | |
| elif method == "right most": | |
| return sorted(faces, key=lambda face: face["bbox"][0])[-1] | |
| elif method == "top most": | |
| return sorted(faces, key=lambda face: face["bbox"][1])[0] | |
| elif method == "bottom most": | |
| return sorted(faces, key=lambda face: face["bbox"][1])[-1] | |
| elif method == "middle": | |
| return sorted(faces, key=lambda face: ( | |
| (face["bbox"][0] + face["bbox"][2]) / 2 - 0.5) ** 2 + | |
| ((face["bbox"][1] + face["bbox"][3]) / 2 - 0.5) ** 2)[len(faces) // 2] | |
| elif method == "biggest": | |
| return sorted(faces, key=lambda face: (face["bbox"][2] - face["bbox"][0]) * (face["bbox"][3] - face["bbox"][1]))[-1] | |
| elif method == "smallest": | |
| return sorted(faces, key=lambda face: (face["bbox"][2] - face["bbox"][0]) * (face["bbox"][3] - face["bbox"][1]))[0] | |
| def analyse_face(image, model, return_single_face=True, detect_condition="best detection", scale=1.0): | |
| faces = model.get(image) | |
| if scale != 1: # landmark-scale | |
| for i, face in enumerate(faces): | |
| landmark = face['kps'] | |
| center = np.mean(landmark, axis=0) | |
| landmark = center + (landmark - center) * scale | |
| faces[i]['kps'] = landmark | |
| if not return_single_face: | |
| return faces | |
| return get_single_face(faces, method=detect_condition) | |
| def cosine_distance(a, b): | |
| a /= np.linalg.norm(a) | |
| b /= np.linalg.norm(b) | |
| return 1 - np.dot(a, b) | |
| def get_analysed_data(face_analyser, image_sequence, source_data, swap_condition="All face", detect_condition="left most", scale=1.0): | |
| if swap_condition != "Specific Face": | |
| source_path, age = source_data | |
| source_image = cv2.imread(source_path) | |
| analysed_source = analyse_face(source_image, face_analyser, return_single_face=True, detect_condition=detect_condition, scale=scale) | |
| else: | |
| analysed_source_specifics = [] | |
| source_specifics, threshold = source_data | |
| for source, specific in zip(*source_specifics): | |
| if source is None or specific is None: | |
| continue | |
| analysed_source = analyse_face(source, face_analyser, return_single_face=True, detect_condition=detect_condition, scale=scale) | |
| analysed_specific = analyse_face(specific, face_analyser, return_single_face=True, detect_condition=detect_condition, scale=scale) | |
| analysed_source_specifics.append([analysed_source, analysed_specific]) | |
| analysed_target_list = [] | |
| analysed_source_list = [] | |
| whole_frame_eql_list = [] | |
| num_faces_per_frame = [] | |
| total_frames = len(image_sequence) | |
| curr_idx = 0 | |
| for curr_idx, frame_path in tqdm(enumerate(image_sequence), total=total_frames, desc="Analysing face data"): | |
| frame = cv2.imread(frame_path) | |
| analysed_faces = analyse_face(frame, face_analyser, return_single_face=False, detect_condition=detect_condition, scale=scale) | |
| n_faces = 0 | |
| for analysed_face in analysed_faces: | |
| if swap_condition == "All Face": | |
| analysed_target_list.append(analysed_face) | |
| analysed_source_list.append(analysed_source) | |
| whole_frame_eql_list.append(frame_path) | |
| n_faces += 1 | |
| elif swap_condition == "Age less than" and analysed_face["age"] < age: | |
| analysed_target_list.append(analysed_face) | |
| analysed_source_list.append(analysed_source) | |
| whole_frame_eql_list.append(frame_path) | |
| n_faces += 1 | |
| elif swap_condition == "Age greater than" and analysed_face["age"] > age: | |
| analysed_target_list.append(analysed_face) | |
| analysed_source_list.append(analysed_source) | |
| whole_frame_eql_list.append(frame_path) | |
| n_faces += 1 | |
| elif swap_condition == "All Male" and analysed_face["gender"] == 1: | |
| analysed_target_list.append(analysed_face) | |
| analysed_source_list.append(analysed_source) | |
| whole_frame_eql_list.append(frame_path) | |
| n_faces += 1 | |
| elif swap_condition == "All Female" and analysed_face["gender"] == 0: | |
| analysed_target_list.append(analysed_face) | |
| analysed_source_list.append(analysed_source) | |
| whole_frame_eql_list.append(frame_path) | |
| n_faces += 1 | |
| elif swap_condition == "Specific Face": | |
| for analysed_source, analysed_specific in analysed_source_specifics: | |
| distance = cosine_distance(analysed_specific["embedding"], analysed_face["embedding"]) | |
| if distance < threshold: | |
| analysed_target_list.append(analysed_face) | |
| analysed_source_list.append(analysed_source) | |
| whole_frame_eql_list.append(frame_path) | |
| n_faces += 1 | |
| if swap_condition == "Left Most": | |
| analysed_face = get_single_face(analysed_faces, method="left most") | |
| analysed_target_list.append(analysed_face) | |
| analysed_source_list.append(analysed_source) | |
| whole_frame_eql_list.append(frame_path) | |
| n_faces += 1 | |
| elif swap_condition == "Right Most": | |
| analysed_face = get_single_face(analysed_faces, method="right most") | |
| analysed_target_list.append(analysed_face) | |
| analysed_source_list.append(analysed_source) | |
| whole_frame_eql_list.append(frame_path) | |
| n_faces += 1 | |
| elif swap_condition == "Top Most": | |
| analysed_face = get_single_face(analysed_faces, method="top most") | |
| analysed_target_list.append(analysed_face) | |
| analysed_source_list.append(analysed_source) | |
| whole_frame_eql_list.append(frame_path) | |
| n_faces += 1 | |
| elif swap_condition == "Bottom Most": | |
| analysed_face = get_single_face(analysed_faces, method="bottom most") | |
| analysed_target_list.append(analysed_face) | |
| analysed_source_list.append(analysed_source) | |
| whole_frame_eql_list.append(frame_path) | |
| n_faces += 1 | |
| elif swap_condition == "Middle": | |
| analysed_face = get_single_face(analysed_faces, method="middle") | |
| analysed_target_list.append(analysed_face) | |
| analysed_source_list.append(analysed_source) | |
| whole_frame_eql_list.append(frame_path) | |
| n_faces += 1 | |
| elif swap_condition == "Biggest": | |
| analysed_face = get_single_face(analysed_faces, method="biggest") | |
| analysed_target_list.append(analysed_face) | |
| analysed_source_list.append(analysed_source) | |
| whole_frame_eql_list.append(frame_path) | |
| n_faces += 1 | |
| elif swap_condition == "Smallest": | |
| analysed_face = get_single_face(analysed_faces, method="smallest") | |
| analysed_target_list.append(analysed_face) | |
| analysed_source_list.append(analysed_source) | |
| whole_frame_eql_list.append(frame_path) | |
| n_faces += 1 | |
| num_faces_per_frame.append(n_faces) | |
| return analysed_target_list, analysed_source_list, whole_frame_eql_list, num_faces_per_frame | |
| #### FACE_ANALYSER.PY CODE END ### | |
| #### UTILS.PY CODE START ### | |
| import os | |
| import cv2 | |
| import time | |
| import glob | |
| import shutil | |
| import platform | |
| import datetime | |
| import subprocess | |
| import numpy as np | |
| from threading import Thread | |
| from moviepy.editor import VideoFileClip, ImageSequenceClip | |
| from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip | |
| logo_image = cv2.imread("./assets/images/logo.png", cv2.IMREAD_UNCHANGED) | |
| quality_types = ["poor", "low", "medium", "high", "best"] | |
| bitrate_quality_by_resolution = { | |
| 240: {"poor": "300k", "low": "500k", "medium": "800k", "high": "1000k", "best": "1200k"}, | |
| 360: {"poor": "500k","low": "800k","medium": "1200k","high": "1500k","best": "2000k"}, | |
| 480: {"poor": "800k","low": "1200k","medium": "2000k","high": "2500k","best": "3000k"}, | |
| 720: {"poor": "1500k","low": "2500k","medium": "4000k","high": "5000k","best": "6000k"}, | |
| 1080: {"poor": "2500k","low": "4000k","medium": "6000k","high": "7000k","best": "8000k"}, | |
| 1440: {"poor": "4000k","low": "6000k","medium": "8000k","high": "10000k","best": "12000k"}, | |
| 2160: {"poor": "8000k","low": "10000k","medium": "12000k","high": "15000k","best": "20000k"} | |
| } | |
| crf_quality_by_resolution = { | |
| 240: {"poor": 45, "low": 35, "medium": 28, "high": 23, "best": 20}, | |
| 360: {"poor": 35, "low": 28, "medium": 23, "high": 20, "best": 18}, | |
| 480: {"poor": 28, "low": 23, "medium": 20, "high": 18, "best": 16}, | |
| 720: {"poor": 23, "low": 20, "medium": 18, "high": 16, "best": 14}, | |
| 1080: {"poor": 20, "low": 18, "medium": 16, "high": 14, "best": 12}, | |
| 1440: {"poor": 18, "low": 16, "medium": 14, "high": 12, "best": 10}, | |
| 2160: {"poor": 16, "low": 14, "medium": 12, "high": 10, "best": 8} | |
| } | |
| def get_bitrate_for_resolution(resolution, quality): | |
| available_resolutions = list(bitrate_quality_by_resolution.keys()) | |
| closest_resolution = min(available_resolutions, key=lambda x: abs(x - resolution)) | |
| return bitrate_quality_by_resolution[closest_resolution][quality] | |
| def get_crf_for_resolution(resolution, quality): | |
| available_resolutions = list(crf_quality_by_resolution.keys()) | |
| closest_resolution = min(available_resolutions, key=lambda x: abs(x - resolution)) | |
| return crf_quality_by_resolution[closest_resolution][quality] | |
| def get_video_bitrate(video_file): | |
| ffprobe_cmd = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', | |
| 'stream=bit_rate', '-of', 'default=noprint_wrappers=1:nokey=1', video_file] | |
| result = subprocess.run(ffprobe_cmd, stdout=subprocess.PIPE) | |
| kbps = max(int(result.stdout) // 1000, 10) | |
| return str(kbps) + 'k' | |
| def trim_video(video_path, output_path, start_frame, stop_frame): | |
| video_name, _ = os.path.splitext(os.path.basename(video_path)) | |
| trimmed_video_filename = video_name + "_trimmed" + ".mp4" | |
| temp_path = os.path.join(output_path, "trim") | |
| os.makedirs(temp_path, exist_ok=True) | |
| trimmed_video_file_path = os.path.join(temp_path, trimmed_video_filename) | |
| video = VideoFileClip(video_path, fps_source="fps") | |
| fps = video.fps | |
| start_time = start_frame / fps | |
| duration = (stop_frame - start_frame) / fps | |
| bitrate = get_bitrate_for_resolution(min(*video.size), "high") | |
| trimmed_video = video.subclip(start_time, start_time + duration) | |
| trimmed_video.write_videofile( | |
| trimmed_video_file_path, codec="libx264", audio_codec="aac", bitrate=bitrate, | |
| ) | |
| trimmed_video.close() | |
| video.close() | |
| return trimmed_video_file_path | |
| def open_directory(path=None): | |
| if path is None: | |
| return | |
| try: | |
| os.startfile(path) | |
| except: | |
| subprocess.Popen(["xdg-open", path]) | |
| class StreamerThread(object): | |
| def __init__(self, src=0): | |
| self.capture = cv2.VideoCapture(src) | |
| self.capture.set(cv2.CAP_PROP_BUFFERSIZE, 2) | |
| self.FPS = 1 / 30 | |
| self.FPS_MS = int(self.FPS * 1000) | |
| self.thread = None | |
| self.stopped = False | |
| self.frame = None | |
| def start(self): | |
| self.thread = Thread(target=self.update, args=()) | |
| self.thread.daemon = True | |
| self.thread.start() | |
| def stop(self): | |
| self.stopped = True | |
| self.thread.join() | |
| print("stopped") | |
| def update(self): | |
| while not self.stopped: | |
| if self.capture.isOpened(): | |
| (self.status, self.frame) = self.capture.read() | |
| time.sleep(self.FPS) | |
| class ProcessBar: | |
| def __init__(self, bar_length, total, before="β¬", after="π¨"): | |
| self.bar_length = bar_length | |
| self.total = total | |
| self.before = before | |
| self.after = after | |
| self.bar = [self.before] * bar_length | |
| self.start_time = time.time() | |
| def get(self, index): | |
| total = self.total | |
| elapsed_time = time.time() - self.start_time | |
| average_time_per_iteration = elapsed_time / (index + 1) | |
| remaining_iterations = total - (index + 1) | |
| estimated_remaining_time = remaining_iterations * average_time_per_iteration | |
| self.bar[int(index / total * self.bar_length)] = self.after | |
| info_text = f"({index+1}/{total}) {''.join(self.bar)} " | |
| info_text += f"(ETR: {int(estimated_remaining_time // 60)} min {int(estimated_remaining_time % 60)} sec)" | |
| return info_text | |
| def add_logo_to_image(img, logo=logo_image): | |
| logo_size = int(img.shape[1] * 0.1) | |
| logo = cv2.resize(logo, (logo_size, logo_size)) | |
| if logo.shape[2] == 4: | |
| alpha = logo[:, :, 3] | |
| else: | |
| alpha = np.ones_like(logo[:, :, 0]) * 255 | |
| padding = int(logo_size * 0.1) | |
| roi = img.shape[0] - logo_size - padding, img.shape[1] - logo_size - padding | |
| for c in range(0, 3): | |
| img[roi[0] : roi[0] + logo_size, roi[1] : roi[1] + logo_size, c] = ( | |
| alpha / 255.0 | |
| ) * logo[:, :, c] + (1 - alpha / 255.0) * img[ | |
| roi[0] : roi[0] + logo_size, roi[1] : roi[1] + logo_size, c | |
| ] | |
| return img | |
| def split_list_by_lengths(data, length_list): | |
| split_data = [] | |
| start_idx = 0 | |
| for length in length_list: | |
| end_idx = start_idx + length | |
| sublist = data[start_idx:end_idx] | |
| split_data.append(sublist) | |
| start_idx = end_idx | |
| return split_data | |
| def merge_img_sequence_from_ref(ref_video_path, image_sequence, output_file_name): | |
| video_clip = VideoFileClip(ref_video_path, fps_source="fps") | |
| fps = video_clip.fps | |
| duration = video_clip.duration | |
| total_frames = video_clip.reader.nframes | |
| audio_clip = video_clip.audio if video_clip.audio is not None else None | |
| edited_video_clip = ImageSequenceClip(image_sequence, fps=fps) | |
| if audio_clip is not None: | |
| edited_video_clip = edited_video_clip.set_audio(audio_clip) | |
| bitrate = get_bitrate_for_resolution(min(*edited_video_clip.size), "high") | |
| edited_video_clip.set_duration(duration).write_videofile( | |
| output_file_name, codec="libx264", bitrate=bitrate, | |
| ) | |
| edited_video_clip.close() | |
| video_clip.close() | |
| def scale_bbox_from_center(bbox, scale_width, scale_height, image_width, image_height): | |
| # Extract the coordinates of the bbox | |
| x1, y1, x2, y2 = bbox | |
| # Calculate the center point of the bbox | |
| center_x = (x1 + x2) / 2 | |
| center_y = (y1 + y2) / 2 | |
| # Calculate the new width and height of the bbox based on the scaling factors | |
| width = x2 - x1 | |
| height = y2 - y1 | |
| new_width = width * scale_width | |
| new_height = height * scale_height | |
| # Calculate the new coordinates of the bbox, considering the image boundaries | |
| new_x1 = center_x - new_width / 2 | |
| new_y1 = center_y - new_height / 2 | |
| new_x2 = center_x + new_width / 2 | |
| new_y2 = center_y + new_height / 2 | |
| # Adjust the coordinates to ensure the bbox remains within the image boundaries | |
| new_x1 = max(0, new_x1) | |
| new_y1 = max(0, new_y1) | |
| new_x2 = min(image_width - 1, new_x2) | |
| new_y2 = min(image_height - 1, new_y2) | |
| # Return the scaled bbox coordinates | |
| scaled_bbox = [new_x1, new_y1, new_x2, new_y2] | |
| return scaled_bbox | |
| def laplacian_blending(A, B, m, num_levels=7): | |
| assert A.shape == B.shape | |
| assert B.shape == m.shape | |
| height = m.shape[0] | |
| width = m.shape[1] | |
| size_list = np.array([4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192]) | |
| size = size_list[np.where(size_list > max(height, width))][0] | |
| GA = np.zeros((size, size, 3), dtype=np.float32) | |
| GA[:height, :width, :] = A | |
| GB = np.zeros((size, size, 3), dtype=np.float32) | |
| GB[:height, :width, :] = B | |
| GM = np.zeros((size, size, 3), dtype=np.float32) | |
| GM[:height, :width, :] = m | |
| gpA = [GA] | |
| gpB = [GB] | |
| gpM = [GM] | |
| for i in range(num_levels): | |
| GA = cv2.pyrDown(GA) | |
| GB = cv2.pyrDown(GB) | |
| GM = cv2.pyrDown(GM) | |
| gpA.append(np.float32(GA)) | |
| gpB.append(np.float32(GB)) | |
| gpM.append(np.float32(GM)) | |
| lpA = [gpA[num_levels-1]] | |
| lpB = [gpB[num_levels-1]] | |
| gpMr = [gpM[num_levels-1]] | |
| for i in range(num_levels-1,0,-1): | |
| LA = np.subtract(gpA[i-1], cv2.pyrUp(gpA[i])) | |
| LB = np.subtract(gpB[i-1], cv2.pyrUp(gpB[i])) | |
| lpA.append(LA) | |
| lpB.append(LB) | |
| gpMr.append(gpM[i-1]) | |
| LS = [] | |
| for la,lb,gm in zip(lpA,lpB,gpMr): | |
| ls = la * gm + lb * (1.0 - gm) | |
| LS.append(ls) | |
| ls_ = LS[0] | |
| for i in range(1,num_levels): | |
| ls_ = cv2.pyrUp(ls_) | |
| ls_ = cv2.add(ls_, LS[i]) | |
| ls_ = ls_[:height, :width, :] | |
| #ls_ = (ls_ - np.min(ls_)) * (255.0 / (np.max(ls_) - np.min(ls_))) | |
| return ls_.clip(0, 255) | |
| def mask_crop(mask, crop): | |
| top, bottom, left, right = crop | |
| shape = mask.shape | |
| top = int(top) | |
| bottom = int(bottom) | |
| if top + bottom < shape[1]: | |
| if top > 0: mask[:top, :] = 0 | |
| if bottom > 0: mask[-bottom:, :] = 0 | |
| left = int(left) | |
| right = int(right) | |
| if left + right < shape[0]: | |
| if left > 0: mask[:, :left] = 0 | |
| if right > 0: mask[:, -right:] = 0 | |
| return mask | |
| def create_image_grid(images, size=128): | |
| num_images = len(images) | |
| num_cols = int(np.ceil(np.sqrt(num_images))) | |
| num_rows = int(np.ceil(num_images / num_cols)) | |
| grid = np.zeros((num_rows * size, num_cols * size, 3), dtype=np.uint8) | |
| for i, image in enumerate(images): | |
| row_idx = (i // num_cols) * size | |
| col_idx = (i % num_cols) * size | |
| image = cv2.resize(image.copy(), (size,size)) | |
| if image.dtype != np.uint8: | |
| image = (image.astype('float32') * 255).astype('uint8') | |
| if image.ndim == 2: | |
| image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) | |
| grid[row_idx:row_idx + size, col_idx:col_idx + size] = image | |
| return grid | |
| #### UTILS.PY CODE END ### | |
| #### APP.PY CODE END ### | |
| import os | |
| import spaces | |
| import cv2 | |
| import glob | |
| import time | |
| import torch | |
| import shutil | |
| import argparse | |
| import platform | |
| import datetime | |
| import subprocess | |
| import insightface | |
| import onnxruntime | |
| import numpy as np | |
| import gradio as gr | |
| import threading | |
| import queue | |
| from tqdm import tqdm | |
| import concurrent.futures | |
| from moviepy.editor import VideoFileClip | |
| from nsfw_checker import NSFWChecker | |
| from face_swapper import Inswapper, paste_to_whole | |
| from face_analyser import detect_conditions, get_analysed_data, swap_options_list | |
| from face_parsing import init_parsing_model, get_parsed_mask, mask_regions, mask_regions_to_list | |
| from face_enhancer import get_available_enhancer_names, load_face_enhancer_model, cv2_interpolations | |
| from utils import trim_video, StreamerThread, ProcessBar, open_directory, split_list_by_lengths, merge_img_sequence_from_ref, create_image_grid | |
| ## ------------------------------ USER ARGS ------------------------------ | |
| parser = argparse.ArgumentParser(description="Swap-Mukham Face Swapper") | |
| parser.add_argument("--out_dir", help="Default Output directory", default=os.getcwd()) | |
| parser.add_argument("--batch_size", help="Gpu batch size", default=32) | |
| parser.add_argument("--cuda", action="store_true", help="Enable cuda", default=False) | |
| parser.add_argument( | |
| "--colab", action="store_true", help="Enable colab mode", default=False | |
| ) | |
| user_args = parser.parse_args() | |
| ## ------------------------------ DEFAULTS ------------------------------ | |
| USE_COLAB = user_args.colab | |
| USE_CUDA = user_args.cuda | |
| DEF_OUTPUT_PATH = user_args.out_dir | |
| BATCH_SIZE = int(user_args.batch_size) | |
| WORKSPACE = None | |
| OUTPUT_FILE = None | |
| CURRENT_FRAME = None | |
| STREAMER = None | |
| DETECT_CONDITION = "best detection" | |
| DETECT_SIZE = 640 | |
| DETECT_THRESH = 0.6 | |
| NUM_OF_SRC_SPECIFIC = 10 | |
| MASK_INCLUDE = [ | |
| "Skin", | |
| "R-Eyebrow", | |
| "L-Eyebrow", | |
| "L-Eye", | |
| "R-Eye", | |
| "Nose", | |
| "Mouth", | |
| "L-Lip", | |
| "U-Lip" | |
| ] | |
| MASK_SOFT_KERNEL = 17 | |
| MASK_SOFT_ITERATIONS = 10 | |
| MASK_BLUR_AMOUNT = 0.1 | |
| MASK_ERODE_AMOUNT = 0.15 | |
| FACE_SWAPPER = None | |
| FACE_ANALYSER = None | |
| FACE_ENHANCER = None | |
| FACE_PARSER = None | |
| NSFW_DETECTOR = None | |
| FACE_ENHANCER_LIST = ["NONE"] | |
| FACE_ENHANCER_LIST.extend(get_available_enhancer_names()) | |
| FACE_ENHANCER_LIST.extend(cv2_interpolations) | |
| ## ------------------------------ SET EXECUTION PROVIDER ------------------------------ | |
| # Note: Non CUDA users may change settings here | |
| PROVIDER = ["CPUExecutionProvider"] | |
| if USE_CUDA: | |
| available_providers = onnxruntime.get_available_providers() | |
| if "CUDAExecutionProvider" in available_providers: | |
| print("\n********** Running on CUDA **********\n") | |
| PROVIDER = ["CUDAExecutionProvider", "CPUExecutionProvider"] | |
| else: | |
| USE_CUDA = False | |
| print("\n********** CUDA unavailable running on CPU **********\n") | |
| else: | |
| USE_CUDA = False | |
| print("\n********** Running on CPU **********\n") | |
| device = "cuda" if USE_CUDA else "cpu" | |
| EMPTY_CACHE = lambda: torch.cuda.empty_cache() if device == "cuda" else None | |
| ## ------------------------------ LOAD MODELS ------------------------------ | |
| def load_face_analyser_model(name="buffalo_l"): | |
| global FACE_ANALYSER | |
| if FACE_ANALYSER is None: | |
| FACE_ANALYSER = insightface.app.FaceAnalysis(name=name, providers=PROVIDER) | |
| FACE_ANALYSER.prepare( | |
| ctx_id=0, det_size=(DETECT_SIZE, DETECT_SIZE), det_thresh=DETECT_THRESH | |
| ) | |
| def load_face_swapper_model(path="./assets/pretrained_models/inswapper_128.onnx"): | |
| global FACE_SWAPPER | |
| if FACE_SWAPPER is None: | |
| batch = int(BATCH_SIZE) if device == "cuda" else 1 | |
| FACE_SWAPPER = Inswapper(model_file=path, batch_size=batch, providers=PROVIDER) | |
| def load_face_parser_model(path="./assets/pretrained_models/79999_iter.pth"): | |
| global FACE_PARSER | |
| if FACE_PARSER is None: | |
| FACE_PARSER = init_parsing_model(path, device=device) | |
| def load_nsfw_detector_model(path="./assets/pretrained_models/open-nsfw.onnx"): | |
| global NSFW_DETECTOR | |
| if NSFW_DETECTOR is None: | |
| NSFW_DETECTOR = NSFWChecker(model_path=path, providers=PROVIDER) | |
| load_face_analyser_model() | |
| load_face_swapper_model() | |
| ## ------------------------------ MAIN PROCESS ------------------------------ | |
| @spaces.GPU(duration=300, enable_queue=True) | |
| def process( | |
| input_type, | |
| image_path, | |
| video_path, | |
| directory_path, | |
| source_path, | |
| output_path, | |
| output_name, | |
| keep_output_sequence, | |
| condition, | |
| age, | |
| distance, | |
| face_enhancer_name, | |
| enable_face_parser, | |
| mask_includes, | |
| mask_soft_kernel, | |
| mask_soft_iterations, | |
| blur_amount, | |
| erode_amount, | |
| face_scale, | |
| enable_laplacian_blend, | |
| crop_top, | |
| crop_bott, | |
| crop_left, | |
| crop_right, | |
| *specifics, | |
| ): | |
| global WORKSPACE | |
| global OUTPUT_FILE | |
| global PREVIEW | |
| WORKSPACE, OUTPUT_FILE, PREVIEW = None, None, None | |
| ## ------------------------------ GUI UPDATE FUNC ------------------------------ | |
| def ui_before(): | |
| return ( | |
| gr.update(visible=True, value=PREVIEW), | |
| gr.update(interactive=False), | |
| gr.update(interactive=False), | |
| gr.update(visible=False), | |
| ) | |
| def ui_after(): | |
| return ( | |
| gr.update(visible=True, value=PREVIEW), | |
| gr.update(interactive=True), | |
| gr.update(interactive=True), | |
| gr.update(visible=False), | |
| ) | |
| def ui_after_vid(): | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(interactive=True), | |
| gr.update(interactive=True), | |
| gr.update(value=OUTPUT_FILE, visible=True), | |
| ) | |
| start_time = time.time() | |
| total_exec_time = lambda start_time: divmod(time.time() - start_time, 60) | |
| get_finsh_text = lambda start_time: f"βοΈ Completed in {int(total_exec_time(start_time)[0])} min {int(total_exec_time(start_time)[1])} sec." | |
| ## ------------------------------ PREPARE INPUTS & LOAD MODELS ------------------------------ | |
| yield "### \n β Loading NSFW detector model...", *ui_before() | |
| load_nsfw_detector_model() | |
| yield "### \n β Loading face analyser model...", *ui_before() | |
| load_face_analyser_model() | |
| yield "### \n β Loading face swapper model...", *ui_before() | |
| load_face_swapper_model() | |
| if face_enhancer_name != "NONE": | |
| if face_enhancer_name not in cv2_interpolations: | |
| yield f"### \n β Loading {face_enhancer_name} model...", *ui_before() | |
| FACE_ENHANCER = load_face_enhancer_model(name=face_enhancer_name, device=device) | |
| else: | |
| FACE_ENHANCER = None | |
| if enable_face_parser: | |
| yield "### \n β Loading face parsing model...", *ui_before() | |
| load_face_parser_model() | |
| includes = mask_regions_to_list(mask_includes) | |
| specifics = list(specifics) | |
| half = len(specifics) // 2 | |
| sources = specifics[:half] | |
| specifics = specifics[half:] | |
| if crop_top > crop_bott: | |
| crop_top, crop_bott = crop_bott, crop_top | |
| if crop_left > crop_right: | |
| crop_left, crop_right = crop_right, crop_left | |
| crop_mask = (crop_top, 511-crop_bott, crop_left, 511-crop_right) | |
| def swap_process(image_sequence): | |
| ## ------------------------------ CONTENT CHECK ------------------------------ | |
| yield "### \n β Checking contents...", *ui_before() | |
| nsfw = NSFW_DETECTOR.is_nsfw(image_sequence) | |
| if nsfw: | |
| message = "NSFW Content detected !!!" | |
| yield f"### \n π {message}", *ui_before() | |
| assert not nsfw, message | |
| return False | |
| EMPTY_CACHE() | |
| ## ------------------------------ ANALYSE FACE ------------------------------ | |
| yield "### \n β Analysing face data...", *ui_before() | |
| if condition != "Specific Face": | |
| source_data = source_path, age | |
| else: | |
| source_data = ((sources, specifics), distance) | |
| analysed_targets, analysed_sources, whole_frame_list, num_faces_per_frame = get_analysed_data( | |
| FACE_ANALYSER, | |
| image_sequence, | |
| source_data, | |
| swap_condition=condition, | |
| detect_condition=DETECT_CONDITION, | |
| scale=face_scale | |
| ) | |
| ## ------------------------------ SWAP FUNC ------------------------------ | |
| yield "### \n β Generating faces...", *ui_before() | |
| preds = [] | |
| matrs = [] | |
| count = 0 | |
| global PREVIEW | |
| for batch_pred, batch_matr in FACE_SWAPPER.batch_forward(whole_frame_list, analysed_targets, analysed_sources): | |
| preds.extend(batch_pred) | |
| matrs.extend(batch_matr) | |
| EMPTY_CACHE() | |
| count += 1 | |
| if USE_CUDA: | |
| image_grid = create_image_grid(batch_pred, size=128) | |
| PREVIEW = image_grid[:, :, ::-1] | |
| yield f"### \n β Generating face Batch {count}", *ui_before() | |
| ## ------------------------------ FACE ENHANCEMENT ------------------------------ | |
| generated_len = len(preds) | |
| if face_enhancer_name != "NONE": | |
| yield f"### \n β Upscaling faces with {face_enhancer_name}...", *ui_before() | |
| for idx, pred in tqdm(enumerate(preds), total=generated_len, desc=f"Upscaling with {face_enhancer_name}"): | |
| enhancer_model, enhancer_model_runner = FACE_ENHANCER | |
| pred = enhancer_model_runner(pred, enhancer_model) | |
| preds[idx] = cv2.resize(pred, (512,512)) | |
| EMPTY_CACHE() | |
| ## ------------------------------ FACE PARSING ------------------------------ | |
| if enable_face_parser: | |
| yield "### \n β Face-parsing mask...", *ui_before() | |
| masks = [] | |
| count = 0 | |
| for batch_mask in get_parsed_mask(FACE_PARSER, preds, classes=includes, device=device, batch_size=BATCH_SIZE, softness=int(mask_soft_iterations)): | |
| masks.append(batch_mask) | |
| EMPTY_CACHE() | |
| count += 1 | |
| if len(batch_mask) > 1: | |
| image_grid = create_image_grid(batch_mask, size=128) | |
| PREVIEW = image_grid[:, :, ::-1] | |
| yield f"### \n β Face parsing Batch {count}", *ui_before() | |
| masks = np.concatenate(masks, axis=0) if len(masks) >= 1 else masks | |
| else: | |
| masks = [None] * generated_len | |
| ## ------------------------------ SPLIT LIST ------------------------------ | |
| split_preds = split_list_by_lengths(preds, num_faces_per_frame) | |
| del preds | |
| split_matrs = split_list_by_lengths(matrs, num_faces_per_frame) | |
| del matrs | |
| split_masks = split_list_by_lengths(masks, num_faces_per_frame) | |
| del masks | |
| ## ------------------------------ PASTE-BACK ------------------------------ | |
| yield "### \n β Pasting back...", *ui_before() | |
| def post_process(frame_idx, frame_img, split_preds, split_matrs, split_masks, enable_laplacian_blend, crop_mask, blur_amount, erode_amount): | |
| whole_img_path = frame_img | |
| whole_img = cv2.imread(whole_img_path) | |
| blend_method = 'laplacian' if enable_laplacian_blend else 'linear' | |
| for p, m, mask in zip(split_preds[frame_idx], split_matrs[frame_idx], split_masks[frame_idx]): | |
| p = cv2.resize(p, (512,512)) | |
| mask = cv2.resize(mask, (512,512)) if mask is not None else None | |
| m /= 0.25 | |
| whole_img = paste_to_whole(p, whole_img, m, mask=mask, crop_mask=crop_mask, blend_method=blend_method, blur_amount=blur_amount, erode_amount=erode_amount) | |
| cv2.imwrite(whole_img_path, whole_img) | |
| def concurrent_post_process(image_sequence, *args): | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| futures = [] | |
| for idx, frame_img in enumerate(image_sequence): | |
| future = executor.submit(post_process, idx, frame_img, *args) | |
| futures.append(future) | |
| for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="Pasting back"): | |
| result = future.result() | |
| concurrent_post_process( | |
| image_sequence, | |
| split_preds, | |
| split_matrs, | |
| split_masks, | |
| enable_laplacian_blend, | |
| crop_mask, | |
| blur_amount, | |
| erode_amount | |
| ) | |
| ## ------------------------------ IMAGE ------------------------------ | |
| if input_type == "Image": | |
| target = cv2.imread(image_path) | |
| output_file = os.path.join(output_path, output_name + ".png") | |
| cv2.imwrite(output_file, target) | |
| for info_update in swap_process([output_file]): | |
| yield info_update | |
| OUTPUT_FILE = output_file | |
| WORKSPACE = output_path | |
| PREVIEW = cv2.imread(output_file)[:, :, ::-1] | |
| yield get_finsh_text(start_time), *ui_after() | |
| ## ------------------------------ VIDEO ------------------------------ | |
| elif input_type == "Video": | |
| temp_path = os.path.join(output_path, output_name, "sequence") | |
| os.makedirs(temp_path, exist_ok=True) | |
| yield "### \n β Extracting video frames...", *ui_before() | |
| image_sequence = [] | |
| cap = cv2.VideoCapture(video_path) | |
| curr_idx = 0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret:break | |
| frame_path = os.path.join(temp_path, f"frame_{curr_idx}.jpg") | |
| cv2.imwrite(frame_path, frame) | |
| image_sequence.append(frame_path) | |
| curr_idx += 1 | |
| cap.release() | |
| cv2.destroyAllWindows() | |
| for info_update in swap_process(image_sequence): | |
| yield info_update | |
| yield "### \n β Merging sequence...", *ui_before() | |
| output_video_path = os.path.join(output_path, output_name + ".mp4") | |
| merge_img_sequence_from_ref(video_path, image_sequence, output_video_path) | |
| if os.path.exists(temp_path) and not keep_output_sequence: | |
| yield "### \n β Removing temporary files...", *ui_before() | |
| shutil.rmtree(temp_path) | |
| WORKSPACE = output_path | |
| OUTPUT_FILE = output_video_path | |
| yield get_finsh_text(start_time), *ui_after_vid() | |
| ## ------------------------------ DIRECTORY ------------------------------ | |
| elif input_type == "Directory": | |
| extensions = ["jpg", "jpeg", "png", "bmp", "tiff", "ico", "webp"] | |
| temp_path = os.path.join(output_path, output_name) | |
| if os.path.exists(temp_path): | |
| shutil.rmtree(temp_path) | |
| os.mkdir(temp_path) | |
| file_paths =[] | |
| for file_path in glob.glob(os.path.join(directory_path, "*")): | |
| if any(file_path.lower().endswith(ext) for ext in extensions): | |
| img = cv2.imread(file_path) | |
| new_file_path = os.path.join(temp_path, os.path.basename(file_path)) | |
| cv2.imwrite(new_file_path, img) | |
| file_paths.append(new_file_path) | |
| for info_update in swap_process(file_paths): | |
| yield info_update | |
| PREVIEW = cv2.imread(file_paths[-1])[:, :, ::-1] | |
| WORKSPACE = temp_path | |
| OUTPUT_FILE = file_paths[-1] | |
| yield get_finsh_text(start_time), *ui_after() | |
| ## ------------------------------ STREAM ------------------------------ | |
| elif input_type == "Stream": | |
| pass | |
| ## ------------------------------ GRADIO FUNC ------------------------------ | |
| def update_radio(value): | |
| if value == "Image": | |
| return ( | |
| gr.update(visible=True), | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| ) | |
| elif value == "Video": | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| gr.update(visible=False), | |
| ) | |
| elif value == "Directory": | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| ) | |
| elif value == "Stream": | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| ) | |
| def swap_option_changed(value): | |
| if value.startswith("Age"): | |
| return ( | |
| gr.update(visible=True), | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| ) | |
| elif value == "Specific Face": | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| gr.update(visible=False), | |
| ) | |
| return gr.update(visible=False), gr.update(visible=False), gr.update(visible=True) | |
| def video_changed(video_path): | |
| sliders_update = gr.Slider.update | |
| button_update = gr.Button.update | |
| number_update = gr.Number.update | |
| if video_path is None: | |
| return ( | |
| sliders_update(minimum=0, maximum=0, value=0), | |
| sliders_update(minimum=1, maximum=1, value=1), | |
| number_update(value=1), | |
| ) | |
| try: | |
| clip = VideoFileClip(video_path) | |
| fps = clip.fps | |
| total_frames = clip.reader.nframes | |
| clip.close() | |
| return ( | |
| sliders_update(minimum=0, maximum=total_frames, value=0, interactive=True), | |
| sliders_update( | |
| minimum=0, maximum=total_frames, value=total_frames, interactive=True | |
| ), | |
| number_update(value=fps), | |
| ) | |
| except: | |
| return ( | |
| sliders_update(value=0), | |
| sliders_update(value=0), | |
| number_update(value=1), | |
| ) | |
| def analyse_settings_changed(detect_condition, detection_size, detection_threshold): | |
| yield "### \n β Applying new values..." | |
| global FACE_ANALYSER | |
| global DETECT_CONDITION | |
| DETECT_CONDITION = detect_condition | |
| FACE_ANALYSER = insightface.app.FaceAnalysis(name="buffalo_l", providers=PROVIDER) | |
| FACE_ANALYSER.prepare( | |
| ctx_id=0, | |
| det_size=(int(detection_size), int(detection_size)), | |
| det_thresh=float(detection_threshold), | |
| ) | |
| yield f"### \n βοΈ Applied detect condition:{detect_condition}, detection size: {detection_size}, detection threshold: {detection_threshold}" | |
| def stop_running(): | |
| global STREAMER | |
| if hasattr(STREAMER, "stop"): | |
| STREAMER.stop() | |
| STREAMER = None | |
| return "Cancelled" | |
| def slider_changed(show_frame, video_path, frame_index): | |
| if not show_frame: | |
| return None, None | |
| if video_path is None: | |
| return None, None | |
| clip = VideoFileClip(video_path) | |
| frame = clip.get_frame(frame_index / clip.fps) | |
| frame_array = np.array(frame) | |
| clip.close() | |
| return gr.Image.update(value=frame_array, visible=True), gr.Video.update( | |
| visible=False | |
| ) | |
| def trim_and_reload(video_path, output_path, output_name, start_frame, stop_frame): | |
| yield video_path, f"### \n β Trimming video frame {start_frame} to {stop_frame}..." | |
| try: | |
| output_path = os.path.join(output_path, output_name) | |
| trimmed_video = trim_video(video_path, output_path, start_frame, stop_frame) | |
| yield trimmed_video, "### \n βοΈ Video trimmed and reloaded." | |
| except Exception as e: | |
| print(e) | |
| yield video_path, "### \n β Video trimming failed. See console for more info." | |
| ## ------------------------------ GRADIO GUI ------------------------------ | |
| css = """ | |
| footer{display:none !important} | |
| """ | |
| with gr.Blocks(css=css) as interface: | |
| gr.Markdown("# πΏ Swap Mukham") | |
| gr.Markdown("### Face swap app based on insightface inswapper.") | |
| with gr.Row(): | |
| with gr.Row(): | |
| with gr.Column(scale=0.4): | |
| with gr.Tab("π Swap Condition"): | |
| swap_option = gr.Dropdown( | |
| swap_options_list, | |
| info="Choose which face or faces in the target image to swap.", | |
| multiselect=False, | |
| show_label=False, | |
| value=swap_options_list[0], | |
| interactive=True, | |
| ) | |
| age = gr.Number( | |
| value=25, label="Value", interactive=True, visible=False | |
| ) | |
| with gr.Tab("ποΈ Detection Settings"): | |
| detect_condition_dropdown = gr.Dropdown( | |
| detect_conditions, | |
| label="Condition", | |
| value=DETECT_CONDITION, | |
| interactive=True, | |
| info="This condition is only used when multiple faces are detected on source or specific image.", | |
| ) | |
| detection_size = gr.Number( | |
| label="Detection Size", value=DETECT_SIZE, interactive=True | |
| ) | |
| detection_threshold = gr.Number( | |
| label="Detection Threshold", | |
| value=DETECT_THRESH, | |
| interactive=True, | |
| ) | |
| apply_detection_settings = gr.Button("Apply settings") | |
| with gr.Tab("π€ Output Settings"): | |
| output_directory = gr.Text( | |
| label="Output Directory", | |
| value=DEF_OUTPUT_PATH, | |
| interactive=True, | |
| ) | |
| output_name = gr.Text( | |
| label="Output Name", value="Result", interactive=True | |
| ) | |
| keep_output_sequence = gr.Checkbox( | |
| label="Keep output sequence", value=False, interactive=True | |
| ) | |
| with gr.Tab("πͺ Other Settings"): | |
| face_scale = gr.Slider( | |
| label="Face Scale", | |
| minimum=0, | |
| maximum=2, | |
| value=1, | |
| interactive=True, | |
| ) | |
| face_enhancer_name = gr.Dropdown( | |
| FACE_ENHANCER_LIST, label="Face Enhancer", value="NONE", multiselect=False, interactive=True | |
| ) | |
| with gr.Accordion("Advanced Mask", open=False): | |
| enable_face_parser_mask = gr.Checkbox( | |
| label="Enable Face Parsing", | |
| value=False, | |
| interactive=True, | |
| ) | |
| mask_include = gr.Dropdown( | |
| mask_regions.keys(), | |
| value=MASK_INCLUDE, | |
| multiselect=True, | |
| label="Include", | |
| interactive=True, | |
| ) | |
| mask_soft_kernel = gr.Number( | |
| label="Soft Erode Kernel", | |
| value=MASK_SOFT_KERNEL, | |
| minimum=3, | |
| interactive=True, | |
| visible = False | |
| ) | |
| mask_soft_iterations = gr.Number( | |
| label="Soft Erode Iterations", | |
| value=MASK_SOFT_ITERATIONS, | |
| minimum=0, | |
| interactive=True, | |
| ) | |
| with gr.Accordion("Crop Mask", open=False): | |
| crop_top = gr.Slider(label="Top", minimum=0, maximum=511, value=0, step=1, interactive=True) | |
| crop_bott = gr.Slider(label="Bottom", minimum=0, maximum=511, value=511, step=1, interactive=True) | |
| crop_left = gr.Slider(label="Left", minimum=0, maximum=511, value=0, step=1, interactive=True) | |
| crop_right = gr.Slider(label="Right", minimum=0, maximum=511, value=511, step=1, interactive=True) | |
| erode_amount = gr.Slider( | |
| label="Mask Erode", | |
| minimum=0, | |
| maximum=1, | |
| value=MASK_ERODE_AMOUNT, | |
| step=0.05, | |
| interactive=True, | |
| ) | |
| blur_amount = gr.Slider( | |
| label="Mask Blur", | |
| minimum=0, | |
| maximum=1, | |
| value=MASK_BLUR_AMOUNT, | |
| step=0.05, | |
| interactive=True, | |
| ) | |
| enable_laplacian_blend = gr.Checkbox( | |
| label="Laplacian Blending", | |
| value=True, | |
| interactive=True, | |
| ) | |
| source_image_input = gr.Image( | |
| label="Source face", type="filepath", interactive=True | |
| ) | |
| with gr.Group(visible=False) as specific_face: | |
| for i in range(NUM_OF_SRC_SPECIFIC): | |
| idx = i + 1 | |
| code = "\n" | |
| code += f"with gr.Tab(label='({idx})'):" | |
| code += "\n\twith gr.Row():" | |
| code += f"\n\t\tsrc{idx} = gr.Image(interactive=True, type='numpy', label='Source Face {idx}')" | |
| code += f"\n\t\ttrg{idx} = gr.Image(interactive=True, type='numpy', label='Specific Face {idx}')" | |
| exec(code) | |
| distance_slider = gr.Slider( | |
| minimum=0, | |
| maximum=2, | |
| value=0.6, | |
| interactive=True, | |
| label="Distance", | |
| info="Lower distance is more similar and higher distance is less similar to the target face.", | |
| ) | |
| with gr.Group(): | |
| input_type = gr.Radio( | |
| ["Image", "Video"], | |
| label="Target Type", | |
| value="Image", | |
| ) | |
| with gr.Group(visible=True) as input_image_group: | |
| image_input = gr.Image( | |
| label="Target Image", interactive=True, type="filepath" | |
| ) | |
| with gr.Group(visible=False) as input_video_group: | |
| vid_widget = gr.Video if USE_COLAB else gr.Text | |
| video_input = gr.Video( | |
| label="Target Video", interactive=True | |
| ) | |
| with gr.Accordion("βοΈ Trim video", open=False): | |
| with gr.Column(): | |
| with gr.Row(): | |
| set_slider_range_btn = gr.Button( | |
| "Set frame range", interactive=True | |
| ) | |
| show_trim_preview_btn = gr.Checkbox( | |
| label="Show frame when slider change", | |
| value=True, | |
| interactive=True, | |
| ) | |
| video_fps = gr.Number( | |
| value=30, | |
| interactive=False, | |
| label="Fps", | |
| visible=False, | |
| ) | |
| start_frame = gr.Slider( | |
| minimum=0, | |
| maximum=1, | |
| value=0, | |
| step=1, | |
| interactive=True, | |
| label="Start Frame", | |
| info="", | |
| ) | |
| end_frame = gr.Slider( | |
| minimum=0, | |
| maximum=1, | |
| value=1, | |
| step=1, | |
| interactive=True, | |
| label="End Frame", | |
| info="", | |
| ) | |
| trim_and_reload_btn = gr.Button( | |
| "Trim and Reload", interactive=True | |
| ) | |
| with gr.Group(visible=False) as input_directory_group: | |
| direc_input = gr.Text(label="Path", interactive=True) | |
| with gr.Column(scale=0.6): | |
| info = gr.Markdown(value="...") | |
| with gr.Row(): | |
| swap_button = gr.Button("β¨ Swap", variant="primary") | |
| cancel_button = gr.Button("β Cancel") | |
| preview_image = gr.Image(label="Output", interactive=False) | |
| preview_video = gr.Video( | |
| label="Output", interactive=False, visible=False | |
| ) | |
| with gr.Row(): | |
| output_directory_button = gr.Button( | |
| "π", interactive=False, visible=False | |
| ) | |
| output_video_button = gr.Button( | |
| "π¬", interactive=False, visible=False | |
| ) | |
| with gr.Group(): | |
| with gr.Row(): | |
| gr.Markdown( | |
| "### [π€ Sponsor](https://github.com/sponsors/harisreedhar)" | |
| ) | |
| gr.Markdown( | |
| "### [π¨βπ» Source code](https://github.com/harisreedhar/Swap-Mukham)" | |
| ) | |
| gr.Markdown( | |
| "### [β οΈ Disclaimer](https://github.com/harisreedhar/Swap-Mukham#disclaimer)" | |
| ) | |
| gr.Markdown( | |
| "### [π Run in Colab](https://colab.research.google.com/github/harisreedhar/Swap-Mukham/blob/main/swap_mukham_colab.ipynb)" | |
| ) | |
| gr.Markdown( | |
| "### [π€ Acknowledgements](https://github.com/harisreedhar/Swap-Mukham#acknowledgements)" | |
| ) | |
| ## ------------------------------ GRADIO EVENTS ------------------------------ | |
| set_slider_range_event = set_slider_range_btn.click( | |
| video_changed, | |
| inputs=[video_input], | |
| outputs=[start_frame, end_frame, video_fps], | |
| ) | |
| trim_and_reload_event = trim_and_reload_btn.click( | |
| fn=trim_and_reload, | |
| inputs=[video_input, output_directory, output_name, start_frame, end_frame], | |
| outputs=[video_input, info], | |
| ) | |
| start_frame_event = start_frame.release( | |
| fn=slider_changed, | |
| inputs=[show_trim_preview_btn, video_input, start_frame], | |
| outputs=[preview_image, preview_video], | |
| show_progress=True, | |
| ) | |
| end_frame_event = end_frame.release( | |
| fn=slider_changed, | |
| inputs=[show_trim_preview_btn, video_input, end_frame], | |
| outputs=[preview_image, preview_video], | |
| show_progress=True, | |
| ) | |
| input_type.change( | |
| update_radio, | |
| inputs=[input_type], | |
| outputs=[input_image_group, input_video_group, input_directory_group], | |
| ) | |
| swap_option.change( | |
| swap_option_changed, | |
| inputs=[swap_option], | |
| outputs=[age, specific_face, source_image_input], | |
| ) | |
| apply_detection_settings.click( | |
| analyse_settings_changed, | |
| inputs=[detect_condition_dropdown, detection_size, detection_threshold], | |
| outputs=[info], | |
| ) | |
| src_specific_inputs = [] | |
| gen_variable_txt = ",".join( | |
| [f"src{i+1}" for i in range(NUM_OF_SRC_SPECIFIC)] | |
| + [f"trg{i+1}" for i in range(NUM_OF_SRC_SPECIFIC)] | |
| ) | |
| exec(f"src_specific_inputs = ({gen_variable_txt})") | |
| swap_inputs = [ | |
| input_type, | |
| image_input, | |
| video_input, | |
| direc_input, | |
| source_image_input, | |
| output_directory, | |
| output_name, | |
| keep_output_sequence, | |
| swap_option, | |
| age, | |
| distance_slider, | |
| face_enhancer_name, | |
| enable_face_parser_mask, | |
| mask_include, | |
| mask_soft_kernel, | |
| mask_soft_iterations, | |
| blur_amount, | |
| erode_amount, | |
| face_scale, | |
| enable_laplacian_blend, | |
| crop_top, | |
| crop_bott, | |
| crop_left, | |
| crop_right, | |
| *src_specific_inputs, | |
| ] | |
| swap_outputs = [ | |
| info, | |
| preview_image, | |
| output_directory_button, | |
| output_video_button, | |
| preview_video, | |
| ] | |
| swap_event = swap_button.click( | |
| fn=process, inputs=swap_inputs, outputs=swap_outputs, show_progress=True | |
| ) | |
| cancel_button.click( | |
| fn=stop_running, | |
| inputs=None, | |
| outputs=[info], | |
| cancels=[ | |
| swap_event, | |
| trim_and_reload_event, | |
| set_slider_range_event, | |
| start_frame_event, | |
| end_frame_event, | |
| ], | |
| show_progress=True, | |
| ) | |
| output_directory_button.click( | |
| lambda: open_directory(path=WORKSPACE), inputs=None, outputs=None | |
| ) | |
| output_video_button.click( | |
| lambda: open_directory(path=OUTPUT_FILE), inputs=None, outputs=None | |
| ) | |
| if __name__ == "__main__": | |
| if USE_COLAB: | |
| print("Running in colab mode") | |
| interface.launch() | |
| #### APP.PY CODE END ### | |