euIaxs22 commited on
Commit
04d8889
·
verified ·
1 Parent(s): 91583b8

Update services/vince_pool_manager.py

Browse files
Files changed (1) hide show
  1. services/vince_pool_manager.py +38 -50
services/vince_pool_manager.py CHANGED
@@ -11,11 +11,12 @@ from huggingface_hub import snapshot_download
11
  from omegaconf import OmegaConf, open_dict
12
 
13
  # --- Configurações Globais ---
 
14
  VINCIE_DIR = Path(os.getenv("VINCIE_DIR", "/data/VINCIE"))
15
  VINCE_GIT_URL = os.getenv("VINCE_GIT_URL", "https://github.com/ByteDance-Seed/VINCIE")
16
  VINCE_REPO_ID = os.getenv("VINCE_REPO_ID", "ByteDance-Seed/VINCIE-3B")
17
  VINCE_CKPT = Path(os.getenv("VINCE_CKPT", "/data/ckpt/VINCIE-3B"))
18
- HF_HOME_CACHE = os.getenv("HF_HOME")
19
 
20
  # --- Classe Worker (Gerencia uma única GPU de forma isolada) ---
21
  class VinceWorker:
@@ -30,44 +31,26 @@ class VinceWorker:
30
  print(f"[VinceWorker-{self.device_id_str}] Inicializado. Mapeado para o índice de GPU {self.gpu_index}.")
31
 
32
  def _execute_in_isolated_env(self, function_to_run, *args, **kwargs):
33
- """
34
- Wrapper que define CUDA_VISIBLE_DEVICES antes de executar uma função
35
- e o limpa depois. Garante que o código VINCIE veja apenas uma GPU.
36
- """
37
- # Guarda a variável de ambiente original, se existir
38
  original_cuda_visible = os.environ.get('CUDA_VISIBLE_DEVICES')
39
-
40
  try:
41
- # Isola a visibilidade da GPU para este worker
42
  os.environ['CUDA_VISIBLE_DEVICES'] = self.gpu_index
43
-
44
- # Limpa qualquer cache de dispositivo que o PyTorch possa ter
45
  if torch.cuda.is_available():
46
- torch.cuda.set_device(0) # Agora 'cuda:0' é a nossa GPU alvo
47
-
48
- # Executa a função alvo (carregamento ou inferência)
49
  return function_to_run(*args, **kwargs)
50
-
51
  finally:
52
- # Restaura a variável de ambiente original
53
  if original_cuda_visible is not None:
54
  os.environ['CUDA_VISIBLE_DEVICES'] = original_cuda_visible
55
- else:
56
- # Remove a variável se ela não existia antes
57
- if 'CUDA_VISIBLE_DEVICES' in os.environ:
58
- del os.environ['CUDA_VISIBLE_DEVICES']
59
 
60
  def _load_model_task(self):
61
  """Tarefa de carregamento do modelo, a ser executada no ambiente isolado."""
62
  print(f"[VinceWorker-{self.device_id_str}] Carregando modelo para a VRAM (GPU física visível: {self.gpu_index})...")
63
-
64
- # Para o código VINCIE, a única GPU que ele vê é 'cuda:0'
65
  device_for_vincie = 'cuda:0'
66
-
67
  original_cwd = Path.cwd()
68
  try:
69
  os.chdir(str(VINCIE_DIR))
70
-
71
  if str(VINCIE_DIR) not in sys.path: sys.path.insert(0, str(VINCIE_DIR))
72
 
73
  from common.config import load_config, create_object
@@ -81,12 +64,10 @@ class VinceWorker:
81
 
82
  self.gen.to(torch.device(device_for_vincie))
83
  print(f"[VinceWorker-{self.device_id_str}] ✅ Modelo VINCIE está 'quente' e pronto na GPU física {self.gpu_index}.")
84
-
85
  finally:
86
  os.chdir(original_cwd)
87
 
88
  def load_model_to_gpu(self):
89
- """Wrapper público para carregar o modelo no ambiente isolado."""
90
  if self.gen is None:
91
  self._execute_in_isolated_env(self._load_model_task)
92
 
@@ -107,18 +88,17 @@ class VinceWorker:
107
 
108
  self.gen.inference_loop()
109
  return Path(kwargs["output_dir"])
110
-
111
  finally:
112
  os.chdir(original_cwd)
113
  gc.collect()
114
  torch.cuda.empty_cache()
115
 
116
  def infer(self, **kwargs) -> Path:
117
- """Wrapper público para executar a inferência no ambiente isolado."""
118
  if self.gen is None:
119
  raise RuntimeError(f"Modelo no worker {self.device_id_str} não foi carregado.")
120
  return self._execute_in_isolated_env(self._infer_task, **kwargs)
121
 
 
122
  # --- Classe Pool Manager (A Orquestradora Singleton) ---
123
  class VincePoolManager:
124
  _instance = None
@@ -132,54 +112,61 @@ class VincePoolManager:
132
  cls._instance._initialized = False
133
  return cls._instance
134
 
135
- def __init__(self, num_gpus: int = 4, config_path: str = "/data/VINCIE/configs/generate.yaml", output_root: str = "/app/outputs"):
136
  if self._initialized: return
137
  with self._lock:
138
  if self._initialized: return
139
 
140
- print("Inicializando o VincePoolManager (Multi-GPU, Cache-Enabled, OOM-Fixed)...")
141
  self.output_root = Path(output_root)
142
  self.output_root.mkdir(parents=True, exist_ok=True)
143
  self.worker_lock = threading.Lock()
144
  self.next_worker_idx = 0
145
 
146
- self._setup_dependencies()
147
 
148
  if not torch.cuda.is_available() or torch.cuda.device_count() < num_gpus:
149
  raise RuntimeError(f"Erro: {num_gpus} GPUs são necessárias, mas {torch.cuda.device_count()} foram encontradas.")
150
 
151
  devices = [f'cuda:{i}' for i in range(num_gpus)]
152
- self.workers = [VinceWorker(dev_id, config_path) for dev_id in devices]
 
153
 
154
  print("Iniciando carregamento dos modelos em paralelo para todas as GPUs...")
155
  threads = [threading.Thread(target=worker.load_model_to_gpu) for worker in self.workers]
156
- for t in threads:
157
- t.start()
158
- for t in threads:
159
- t.join()
160
 
161
  self._initialized = True
162
  print(f"✅ VincePoolManager pronto com {len(self.workers)} workers 'quentes'.")
163
 
164
- def _setup_dependencies(self):
165
  """Garante que o código e os modelos do VINCIE estejam disponíveis, usando cache."""
166
- if not VINCIE_DIR.exists():
 
167
  print(f"Clonando repositório VINCIE para {VINCIE_DIR}...")
 
168
  subprocess.run(["git", "clone", "--depth", "1", VINCE_GIT_URL, str(VINCIE_DIR)], check=True)
169
  else:
170
- print("Repositório VINCIE já existe.")
171
 
 
172
  print(f"Verificando checkpoints VINCIE em {VINCE_CKPT} (usando cache em {HF_HOME_CACHE})...")
173
- snapshot_download(
174
- repo_id=VINCE_REPO_ID,
175
- local_dir=VINCE_CKPT,
176
- local_dir_use_symlinks=False,
177
- cache_dir=HF_HOME_CACHE,
178
- resume_download=True,
179
- token=os.getenv("HF_TOKEN")
180
- )
181
- print("Checkpoints VINCIE prontos.")
182
-
 
 
 
 
 
183
  repo_ckpt_dir = VINCIE_DIR / "ckpt"
184
  repo_ckpt_dir.mkdir(parents=True, exist_ok=True)
185
  link = repo_ckpt_dir / "VINCIE-3B"
@@ -190,7 +177,6 @@ class VincePoolManager:
190
  print("Symlink de checkpoint já existe.")
191
 
192
  def _get_next_worker(self) -> VinceWorker:
193
- """Obtém o próximo worker disponível em um sistema de rodízio (thread-safe)."""
194
  with self.worker_lock:
195
  worker = self.workers[self.next_worker_idx]
196
  self.next_worker_idx = (self.next_worker_idx + 1) % len(self.workers)
@@ -217,7 +203,9 @@ class VincePoolManager:
217
  # --- Instância Singleton Global ---
218
  try:
219
  NUM_GPUS_FOR_VINCE = int(os.getenv("VINCE_GPUS", "4"))
220
- vince_pool_manager_singleton = VincePoolManager(num_gpus=NUM_GPUS_FOR_VINCE)
 
 
221
  except Exception as e:
222
  print(f"ERRO CRÍTICO ao inicializar o VincePoolManager: {e}", file=sys.stderr)
223
  vince_pool_manager_singleton = None
 
11
  from omegaconf import OmegaConf, open_dict
12
 
13
  # --- Configurações Globais ---
14
+ # Os diretórios são definidos para usar /data para persistência, espelhando o SeedVR.
15
  VINCIE_DIR = Path(os.getenv("VINCIE_DIR", "/data/VINCIE"))
16
  VINCE_GIT_URL = os.getenv("VINCE_GIT_URL", "https://github.com/ByteDance-Seed/VINCIE")
17
  VINCE_REPO_ID = os.getenv("VINCE_REPO_ID", "ByteDance-Seed/VINCIE-3B")
18
  VINCE_CKPT = Path(os.getenv("VINCE_CKPT", "/data/ckpt/VINCIE-3B"))
19
+ HF_HOME_CACHE = os.getenv("HF_HOME", "/data/.cache/huggingface")
20
 
21
  # --- Classe Worker (Gerencia uma única GPU de forma isolada) ---
22
  class VinceWorker:
 
31
  print(f"[VinceWorker-{self.device_id_str}] Inicializado. Mapeado para o índice de GPU {self.gpu_index}.")
32
 
33
  def _execute_in_isolated_env(self, function_to_run, *args, **kwargs):
34
+ """Wrapper que define CUDA_VISIBLE_DEVICES para isolar a visibilidade da GPU."""
 
 
 
 
35
  original_cuda_visible = os.environ.get('CUDA_VISIBLE_DEVICES')
 
36
  try:
 
37
  os.environ['CUDA_VISIBLE_DEVICES'] = self.gpu_index
 
 
38
  if torch.cuda.is_available():
39
+ torch.cuda.set_device(0) # 'cuda:0' agora é a nossa GPU alvo
 
 
40
  return function_to_run(*args, **kwargs)
 
41
  finally:
 
42
  if original_cuda_visible is not None:
43
  os.environ['CUDA_VISIBLE_DEVICES'] = original_cuda_visible
44
+ elif 'CUDA_VISIBLE_DEVICES' in os.environ:
45
+ del os.environ['CUDA_VISIBLE_DEVICES']
 
 
46
 
47
  def _load_model_task(self):
48
  """Tarefa de carregamento do modelo, a ser executada no ambiente isolado."""
49
  print(f"[VinceWorker-{self.device_id_str}] Carregando modelo para a VRAM (GPU física visível: {self.gpu_index})...")
 
 
50
  device_for_vincie = 'cuda:0'
 
51
  original_cwd = Path.cwd()
52
  try:
53
  os.chdir(str(VINCIE_DIR))
 
54
  if str(VINCIE_DIR) not in sys.path: sys.path.insert(0, str(VINCIE_DIR))
55
 
56
  from common.config import load_config, create_object
 
64
 
65
  self.gen.to(torch.device(device_for_vincie))
66
  print(f"[VinceWorker-{self.device_id_str}] ✅ Modelo VINCIE está 'quente' e pronto na GPU física {self.gpu_index}.")
 
67
  finally:
68
  os.chdir(original_cwd)
69
 
70
  def load_model_to_gpu(self):
 
71
  if self.gen is None:
72
  self._execute_in_isolated_env(self._load_model_task)
73
 
 
88
 
89
  self.gen.inference_loop()
90
  return Path(kwargs["output_dir"])
 
91
  finally:
92
  os.chdir(original_cwd)
93
  gc.collect()
94
  torch.cuda.empty_cache()
95
 
96
  def infer(self, **kwargs) -> Path:
 
97
  if self.gen is None:
98
  raise RuntimeError(f"Modelo no worker {self.device_id_str} não foi carregado.")
99
  return self._execute_in_isolated_env(self._infer_task, **kwargs)
100
 
101
+
102
  # --- Classe Pool Manager (A Orquestradora Singleton) ---
103
  class VincePoolManager:
104
  _instance = None
 
112
  cls._instance._initialized = False
113
  return cls._instance
114
 
115
+ def __init__(self, num_gpus: int = 4, output_root: str = "/app/outputs"):
116
  if self._initialized: return
117
  with self._lock:
118
  if self._initialized: return
119
 
120
+ print("Inicializando o VincePoolManager (Padrão SeedVR)...")
121
  self.output_root = Path(output_root)
122
  self.output_root.mkdir(parents=True, exist_ok=True)
123
  self.worker_lock = threading.Lock()
124
  self.next_worker_idx = 0
125
 
126
+ self.setup_dependencies()
127
 
128
  if not torch.cuda.is_available() or torch.cuda.device_count() < num_gpus:
129
  raise RuntimeError(f"Erro: {num_gpus} GPUs são necessárias, mas {torch.cuda.device_count()} foram encontradas.")
130
 
131
  devices = [f'cuda:{i}' for i in range(num_gpus)]
132
+ vincie_config_path = VINCIE_DIR / "configs/generate.yaml"
133
+ self.workers = [VinceWorker(dev_id, str(vincie_config_path)) for dev_id in devices]
134
 
135
  print("Iniciando carregamento dos modelos em paralelo para todas as GPUs...")
136
  threads = [threading.Thread(target=worker.load_model_to_gpu) for worker in self.workers]
137
+ for t in threads: t.start()
138
+ for t in threads: t.join()
 
 
139
 
140
  self._initialized = True
141
  print(f"✅ VincePoolManager pronto com {len(self.workers)} workers 'quentes'.")
142
 
143
+ def setup_dependencies(self):
144
  """Garante que o código e os modelos do VINCIE estejam disponíveis, usando cache."""
145
+ # 1. Código do Repositório (clona em /data/VINCIE)
146
+ if not (VINCIE_DIR / ".git").exists():
147
  print(f"Clonando repositório VINCIE para {VINCIE_DIR}...")
148
+ VINCIE_DIR.parent.mkdir(parents=True, exist_ok=True)
149
  subprocess.run(["git", "clone", "--depth", "1", VINCE_GIT_URL, str(VINCIE_DIR)], check=True)
150
  else:
151
+ print("Repositório VINCIE já existe em /data/VINCIE.")
152
 
153
+ # 2. Modelos (com cache, baixando para /data/ckpt/VINCIE-3B)
154
  print(f"Verificando checkpoints VINCIE em {VINCE_CKPT} (usando cache em {HF_HOME_CACHE})...")
155
+ try:
156
+ snapshot_download(
157
+ repo_id=VINCE_REPO_ID,
158
+ local_dir=VINCE_CKPT,
159
+ local_dir_use_symlinks=False,
160
+ cache_dir=HF_HOME_CACHE,
161
+ resume_download=True,
162
+ token=os.getenv("HF_TOKEN")
163
+ )
164
+ print("Checkpoints VINCIE prontos.")
165
+ except Exception as e:
166
+ print(f"ERRO durante o snapshot_download para VINCIE: {e}")
167
+ raise
168
+
169
+ # 3. Symlink para compatibilidade
170
  repo_ckpt_dir = VINCIE_DIR / "ckpt"
171
  repo_ckpt_dir.mkdir(parents=True, exist_ok=True)
172
  link = repo_ckpt_dir / "VINCIE-3B"
 
177
  print("Symlink de checkpoint já existe.")
178
 
179
  def _get_next_worker(self) -> VinceWorker:
 
180
  with self.worker_lock:
181
  worker = self.workers[self.next_worker_idx]
182
  self.next_worker_idx = (self.next_worker_idx + 1) % len(self.workers)
 
203
  # --- Instância Singleton Global ---
204
  try:
205
  NUM_GPUS_FOR_VINCE = int(os.getenv("VINCE_GPUS", "4"))
206
+ # Passamos `output_root` lido da env var para o construtor.
207
+ output_root_path = os.getenv("OUTPUT_ROOT", "/app/outputs")
208
+ vince_pool_manager_singleton = VincePoolManager(num_gpus=NUM_GPUS_FOR_VINCE, output_root=output_root_path)
209
  except Exception as e:
210
  print(f"ERRO CRÍTICO ao inicializar o VincePoolManager: {e}", file=sys.stderr)
211
  vince_pool_manager_singleton = None