| 
							 | 
						 | 
					
					
						
						| 
							 | 
						 | 
					
					
						
						| 
							 | 
						 | 
					
					
						
						| 
							 | 
						 | 
					
					
						
						| 
							 | 
						 | 
					
					
						
						| 
							 | 
						 | 
					
					
						
						| 
							 | 
						 | 
					
					
						
						| 
							 | 
						 | 
					
					
						
						| 
							 | 
						 | 
					
					
						
						| 
							 | 
						 | 
					
					
						
						| 
							 | 
						 | 
					
					
						
						| 
							 | 
						 | 
					
					
						
						| 
							 | 
						 | 
					
					
						
						| 
							 | 
						 | 
					
					
						
						| 
							 | 
						 | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						import re | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						import torch | 
					
					
						
						| 
							 | 
						import torch.nn as nn | 
					
					
						
						| 
							 | 
						from transformers import AutoConfig, AutoModel, PretrainedConfig, PreTrainedModel | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						class IdentityMap(nn.Module): | 
					
					
						
						| 
							 | 
						    def __init__(self): | 
					
					
						
						| 
							 | 
						        super().__init__() | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						    def forward(self, x, *args, **kwargs): | 
					
					
						
						| 
							 | 
						        return x | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						    @property | 
					
					
						
						| 
							 | 
						    def config(self): | 
					
					
						
						| 
							 | 
						        return {"mm_projector_type": "identity"} | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						class SimpleResBlock(nn.Module): | 
					
					
						
						| 
							 | 
						    def __init__(self, channels): | 
					
					
						
						| 
							 | 
						        super().__init__() | 
					
					
						
						| 
							 | 
						        self.pre_norm = nn.LayerNorm(channels) | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						        self.proj = nn.Sequential(nn.Linear(channels, channels), nn.GELU(), nn.Linear(channels, channels)) | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						    def forward(self, x): | 
					
					
						
						| 
							 | 
						        x = self.pre_norm(x) | 
					
					
						
						| 
							 | 
						        return x + self.proj(x) | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						class DownSampleBlock(nn.Module): | 
					
					
						
						| 
							 | 
						    def forward(self, x): | 
					
					
						
						| 
							 | 
						        vit_embeds = x | 
					
					
						
						| 
							 | 
						        h = w = int(vit_embeds.shape[1] ** 0.5) | 
					
					
						
						| 
							 | 
						        vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], h, w, -1) | 
					
					
						
						| 
							 | 
						        vit_embeds = self.flat_square(vit_embeds) | 
					
					
						
						| 
							 | 
						        vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], -1, vit_embeds.shape[-1]) | 
					
					
						
						| 
							 | 
						        return vit_embeds | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						    def flat_square(self, x): | 
					
					
						
						| 
							 | 
						        n, w, h, c = x.size() | 
					
					
						
						| 
							 | 
						        if w % 2 == 1: | 
					
					
						
						| 
							 | 
						            x = torch.concat([x, torch.zeros((n, 1, h, c), dtype=x.dtype).to(x.device)], dim=1).contiguous() | 
					
					
						
						| 
							 | 
						            n, w, h, c = x.size() | 
					
					
						
						| 
							 | 
						        if h % 2 == 1: | 
					
					
						
						| 
							 | 
						            x = torch.concat([x, torch.zeros((n, w, 1, c), dtype=x.dtype).to(x.device)], dim=2).contiguous() | 
					
					
						
						| 
							 | 
						            n, w, h, c = x.size() | 
					
					
						
						| 
							 | 
						        x = x.contiguous() | 
					
					
						
						| 
							 | 
						        x = x.view(n, w, int(h / 2), int(c * 2)) | 
					
					
						
						| 
							 | 
						        x = x.permute(0, 2, 1, 3).contiguous() | 
					
					
						
						| 
							 | 
						        x = x.view(n, int(h / 2), int(w / 2), int(c * 4)) | 
					
					
						
						| 
							 | 
						        x = x.permute(0, 2, 1, 3).contiguous() | 
					
					
						
						| 
							 | 
						        return x | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						class DownSample2x2BlockFix(nn.Module): | 
					
					
						
						| 
							 | 
						    def forward(self, x): | 
					
					
						
						| 
							 | 
						        vit_embeds = x | 
					
					
						
						| 
							 | 
						        h = w = int(vit_embeds.shape[1] ** 0.5) | 
					
					
						
						| 
							 | 
						        vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], h, w, -1) | 
					
					
						
						| 
							 | 
						        vit_embeds = flat_square_2x2(vit_embeds) | 
					
					
						
						| 
							 | 
						        vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], -1, vit_embeds.shape[-1]) | 
					
					
						
						| 
							 | 
						        return vit_embeds | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						def flat_square_2x2(x): | 
					
					
						
						| 
							 | 
						    n, w, h, c = x.size() | 
					
					
						
						| 
							 | 
						    if w % 2 == 1: | 
					
					
						
						| 
							 | 
						        x = torch.concat([x, torch.zeros((n, 1, h, c), dtype=x.dtype).to(x.device)], dim=1).contiguous() | 
					
					
						
						| 
							 | 
						        n, w, h, c = x.size() | 
					
					
						
						| 
							 | 
						    x = x.contiguous() | 
					
					
						
						| 
							 | 
						    if h % 2 == 1: | 
					
					
						
						| 
							 | 
						        x = torch.concat([x, torch.zeros((n, w, 1, c), dtype=x.dtype).to(x.device)], dim=2).contiguous() | 
					
					
						
						| 
							 | 
						        n, w, h, c = x.size() | 
					
					
						
						| 
							 | 
						    x = x.view(n, w, int(h / 2), int(c * 2)) | 
					
					
						
						| 
							 | 
						    x = x.permute(0, 2, 1, 3).contiguous() | 
					
					
						
						| 
							 | 
						    x = x.view(n, int(h / 2), int(w / 2), int(c * 4)) | 
					
					
						
						| 
							 | 
						    x = x.permute(0, 2, 1, 3).contiguous() | 
					
					
						
						| 
							 | 
						    return x | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						class DownSample3x3BlockFix(nn.Module): | 
					
					
						
						| 
							 | 
						    def forward(self, x): | 
					
					
						
						| 
							 | 
						        vit_embeds = x | 
					
					
						
						| 
							 | 
						        h = w = int(vit_embeds.shape[1] ** 0.5) | 
					
					
						
						| 
							 | 
						        vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], h, w, -1) | 
					
					
						
						| 
							 | 
						        vit_embeds = flat_square_3x3(vit_embeds) | 
					
					
						
						| 
							 | 
						        vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], -1, vit_embeds.shape[-1]) | 
					
					
						
						| 
							 | 
						        return vit_embeds | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						def flat_square_3x3(x): | 
					
					
						
						| 
							 | 
						    n, w, h, c = x.size() | 
					
					
						
						| 
							 | 
						    if w % 3 != 0: | 
					
					
						
						| 
							 | 
						        x = torch.concat([x, torch.zeros((n, 3 - (w % 3), h, c), dtype=x.dtype).to(x.device)], dim=1).contiguous() | 
					
					
						
						| 
							 | 
						        n, w, h, c = x.size() | 
					
					
						
						| 
							 | 
						    x = x.contiguous() | 
					
					
						
						| 
							 | 
						    if h % 3 != 0: | 
					
					
						
						| 
							 | 
						        x = torch.concat([x, torch.zeros((n, w, 3 - (h % 3), c), dtype=x.dtype).to(x.device)], dim=2).contiguous() | 
					
					
						
						| 
							 | 
						        n, w, h, c = x.size() | 
					
					
						
						| 
							 | 
						    x = x.view(n, w, int(h / 3), int(c * 3)) | 
					
					
						
						| 
							 | 
						    x = x.permute(0, 2, 1, 3).contiguous() | 
					
					
						
						| 
							 | 
						    x = x.view(n, int(h / 3), int(w / 3), int(c * 9)) | 
					
					
						
						| 
							 | 
						    x = x.permute(0, 2, 1, 3).contiguous() | 
					
					
						
						| 
							 | 
						    return x | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						class MultimodalProjectorConfig(PretrainedConfig): | 
					
					
						
						| 
							 | 
						    model_type = "v2l_projector" | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						    def __init__(self, mm_projector_type: str = None, **kwargs): | 
					
					
						
						| 
							 | 
						        super().__init__() | 
					
					
						
						| 
							 | 
						        self.mm_projector_type = mm_projector_type | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						class MultimodalProjector(PreTrainedModel): | 
					
					
						
						| 
							 | 
						    config_class = MultimodalProjectorConfig | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						    def __init__(self, mm_projector_cfg: MultimodalProjectorConfig, config: PretrainedConfig): | 
					
					
						
						| 
							 | 
						        super().__init__(mm_projector_cfg) | 
					
					
						
						| 
							 | 
						        mm_projector_type = mm_projector_cfg.mm_projector_type | 
					
					
						
						| 
							 | 
						        self.downsample_rate = 1 | 
					
					
						
						| 
							 | 
						        if mm_projector_type == "identity": | 
					
					
						
						| 
							 | 
						            self.layers = IdentityMap() | 
					
					
						
						| 
							 | 
						        elif mm_projector_type == "linear": | 
					
					
						
						| 
							 | 
						            self.layers = nn.Linear(config.mm_hidden_size, config.hidden_size) | 
					
					
						
						| 
							 | 
						        elif mm_projector_type == "mlp_downsample": | 
					
					
						
						| 
							 | 
						            self.layers = nn.Sequential( | 
					
					
						
						| 
							 | 
						                DownSampleBlock(), | 
					
					
						
						| 
							 | 
						                nn.LayerNorm(config.mm_hidden_size * 4), | 
					
					
						
						| 
							 | 
						                nn.Linear(config.mm_hidden_size * 4, config.hidden_size), | 
					
					
						
						| 
							 | 
						                nn.GELU(), | 
					
					
						
						| 
							 | 
						                nn.Linear(config.hidden_size, config.hidden_size), | 
					
					
						
						| 
							 | 
						            ) | 
					
					
						
						| 
							 | 
						            self.downsample_rate = 2 | 
					
					
						
						| 
							 | 
						        elif mm_projector_type == "mlp_downsample_2x2_fix": | 
					
					
						
						| 
							 | 
						            self.layers = nn.Sequential( | 
					
					
						
						| 
							 | 
						                DownSample2x2BlockFix(), | 
					
					
						
						| 
							 | 
						                nn.LayerNorm(config.mm_hidden_size * 4), | 
					
					
						
						| 
							 | 
						                nn.Linear(config.mm_hidden_size * 4, config.hidden_size), | 
					
					
						
						| 
							 | 
						                nn.GELU(), | 
					
					
						
						| 
							 | 
						                nn.Linear(config.hidden_size, config.hidden_size), | 
					
					
						
						| 
							 | 
						            ) | 
					
					
						
						| 
							 | 
						            self.downsample_rate = 2 | 
					
					
						
						| 
							 | 
						        elif mm_projector_type == "mlp_downsample_3x3_fix": | 
					
					
						
						| 
							 | 
						            self.layers = nn.Sequential( | 
					
					
						
						| 
							 | 
						                DownSample3x3BlockFix(), | 
					
					
						
						| 
							 | 
						                nn.LayerNorm(config.mm_hidden_size * 9), | 
					
					
						
						| 
							 | 
						                nn.Linear(config.mm_hidden_size * 9, config.mm_hidden_size * 3), | 
					
					
						
						| 
							 | 
						                nn.GELU(), | 
					
					
						
						| 
							 | 
						                nn.LayerNorm(config.mm_hidden_size * 3), | 
					
					
						
						| 
							 | 
						                nn.Linear(config.mm_hidden_size * 3, config.hidden_size), | 
					
					
						
						| 
							 | 
						                nn.GELU(), | 
					
					
						
						| 
							 | 
						                nn.Linear(config.hidden_size, config.hidden_size), | 
					
					
						
						| 
							 | 
						            ) | 
					
					
						
						| 
							 | 
						            self.downsample_rate = 3 | 
					
					
						
						| 
							 | 
						        elif mm_projector_type == "mlp_downsample_3x3_s2": | 
					
					
						
						| 
							 | 
						            self.layers = nn.Sequential( | 
					
					
						
						| 
							 | 
						                DownSample3x3BlockFix(), | 
					
					
						
						| 
							 | 
						                nn.LayerNorm(config.mm_hidden_size * 9), | 
					
					
						
						| 
							 | 
						                nn.Linear(config.mm_hidden_size * 9, config.mm_hidden_size * 3), | 
					
					
						
						| 
							 | 
						                nn.GELU(), | 
					
					
						
						| 
							 | 
						                nn.LayerNorm(config.mm_hidden_size * 3), | 
					
					
						
						| 
							 | 
						                nn.Linear(config.mm_hidden_size * 3, config.mm_hidden_size), | 
					
					
						
						| 
							 | 
						                nn.GELU(), | 
					
					
						
						| 
							 | 
						                nn.LayerNorm(config.mm_hidden_size), | 
					
					
						
						| 
							 | 
						                nn.Linear(config.mm_hidden_size, config.mm_hidden_size // 3), | 
					
					
						
						| 
							 | 
						                nn.GELU(), | 
					
					
						
						| 
							 | 
						                nn.LayerNorm(config.mm_hidden_size // 3), | 
					
					
						
						| 
							 | 
						                nn.Linear(config.mm_hidden_size // 3, config.hidden_size), | 
					
					
						
						| 
							 | 
						                nn.GELU(), | 
					
					
						
						| 
							 | 
						                nn.Linear(config.hidden_size, config.hidden_size), | 
					
					
						
						| 
							 | 
						            ) | 
					
					
						
						| 
							 | 
						        elif mm_projector_type == "mlp_downsample_3x3_s2_new": | 
					
					
						
						| 
							 | 
						            self.layers = nn.Sequential( | 
					
					
						
						| 
							 | 
						                DownSample3x3BlockFix(), | 
					
					
						
						| 
							 | 
						                nn.LayerNorm(config.mm_hidden_size * 9), | 
					
					
						
						| 
							 | 
						                nn.Linear(config.mm_hidden_size * 9, config.mm_hidden_size * 4), | 
					
					
						
						| 
							 | 
						                nn.GELU(), | 
					
					
						
						| 
							 | 
						                nn.LayerNorm(config.mm_hidden_size * 4), | 
					
					
						
						| 
							 | 
						                nn.Linear(config.mm_hidden_size * 4, config.mm_hidden_size * 2), | 
					
					
						
						| 
							 | 
						                nn.GELU(), | 
					
					
						
						| 
							 | 
						                nn.LayerNorm(config.mm_hidden_size * 2), | 
					
					
						
						| 
							 | 
						                nn.Linear(config.mm_hidden_size * 2, config.mm_hidden_size), | 
					
					
						
						| 
							 | 
						                nn.GELU(), | 
					
					
						
						| 
							 | 
						                nn.LayerNorm(config.mm_hidden_size), | 
					
					
						
						| 
							 | 
						                nn.Linear(config.mm_hidden_size, config.mm_hidden_size // 3), | 
					
					
						
						| 
							 | 
						                nn.GELU(), | 
					
					
						
						| 
							 | 
						                nn.LayerNorm(config.mm_hidden_size // 3), | 
					
					
						
						| 
							 | 
						                nn.Linear(config.mm_hidden_size // 3, config.hidden_size), | 
					
					
						
						| 
							 | 
						                nn.GELU(), | 
					
					
						
						| 
							 | 
						                nn.Linear(config.hidden_size, config.hidden_size), | 
					
					
						
						| 
							 | 
						            ) | 
					
					
						
						| 
							 | 
						        else: | 
					
					
						
						| 
							 | 
						            mlp_gelu_match = re.match(r"^mlp(\d+)x_gelu$", mm_projector_type) | 
					
					
						
						| 
							 | 
						            if mlp_gelu_match: | 
					
					
						
						| 
							 | 
						                mlp_depth = int(mlp_gelu_match.group(1)) | 
					
					
						
						| 
							 | 
						                modules = [nn.Linear(config.mm_hidden_size, config.hidden_size)] | 
					
					
						
						| 
							 | 
						                for _ in range(1, mlp_depth): | 
					
					
						
						| 
							 | 
						                    modules.append(nn.GELU()) | 
					
					
						
						| 
							 | 
						                    modules.append(nn.Linear(config.hidden_size, config.hidden_size)) | 
					
					
						
						| 
							 | 
						                self.layers = nn.Sequential(*modules) | 
					
					
						
						| 
							 | 
						            else: | 
					
					
						
						| 
							 | 
						                raise ValueError(f"Unknown projector type: {mm_projector_type}") | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						    def forward(self, x, *args, **kwargs): | 
					
					
						
						| 
							 | 
						        return self.layers(x) | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						 | 
					
					
						
						| 
							 | 
						 | 
					
					
						
						| 
							 | 
						
 |