我很高兴发现该代码已在以下论文中使用和引用:
多米诺骨牌: Eyuboglu et Life-toss-Modal嵌入的系统误差。 al。在ICLR 2022
GSCLIP: Zhu等人解释自然语言分布变化的框架。 al。在ICML 2022
semeval-2022任务5:探索Cuervo等人对厌恶女性模因的多模式检测的对比学习。 al。在Semeval-2022
CDSBERT- HALLEE等人的密码子意识扩展蛋白质语言模型。 al。来自特拉华大学(2023年9月)
Enigma-51:在Ragusa等人的工业场景中对人类对象的相互作用进行细粒度的了解。 al。 (2023年11月)
您可以在此GitHub存储库页面的正确部分上找到命名的引用信息:引用此存储库或使用以下引用信息。
@software { Shariatnia_Simple_CLIP_2021 ,
author = { Shariatnia, M. Moein } ,
doi = { 10.5281/zenodo.6845731 } ,
month = { 4 } ,
title = { {Simple CLIP} } ,
version = { 1.0.0 } ,
year = { 2021 }
}在2021年1月, Openai宣布了两个新型号: Dall-E和Clip ,这两种模型都以某种方式连接文本和图像。在本文中,我们将在Pytorch中从头开始实现剪辑模型。 Openai开源了一些与剪辑模型有关的代码,但我发现它令人生畏,而且远非简短而简单。我还遇到了一个很好的教程,该教程灵感来自Keras Code示例的剪辑模型,然后将其中的某些部分翻译成Pytorch,以完全使用我们心爱的Pytorch构建本教程!
在从自然语言监督论文中学习可转移的视觉模型时,OpenAI介绍了其新模型,该模型称为剪辑,以进行对比语言图像预训练。简而言之,该模型了解了整个句子与其描述的图像之间的关系。从某种意义上说,当训练模型时,鉴于输入句子,它将能够检索与该句子相对应的最相关的图像。这里重要的是,它是对完整句子的培训,而不是诸如汽车,狗等的单一类培训。直觉是,在对整个句子进行培训时,该模型可以学习更多的东西,并在图像和文本之间找到一些模式。他们还表明,当该模型在庞大的图像数据集及其相应的文本中训练时,它也可以充当分类器。我鼓励您研究论文,以了解有关这种令人兴奋的模型及其在基准数据集中的惊人结果的更多信息。仅提到,使用此策略训练的剪辑模型比在ImageNet本身中针对唯一的分类任务进行了优化的SOTA模型,对IMATENET进行了训练!
作为一个预告片(!),让我们看看我们将在本文中从头开始构建的最终模型能够:给定查询(原始文本),例如“一个男孩跳滑板跳着滑板”或“从秋千上跳下来的女孩”,该模型将检索最相关的图像:

让我们看看更多的输出:

# !pip install timm
# !pip install transformers import os
import cv2
import gc
import numpy as np
import pandas as pd
import itertools
from tqdm . autonotebook import tqdm
import albumentations as A
import torch
from torch import nn
import torch . nn . functional as F
import timm
from transformers import DistilBertModel , DistilBertConfig , DistilBertTokenizer 关于Config和CFG的注释:我用Python脚本编写了代码,然后将其转换为Jupyter笔记本。因此,在Python脚本的情况下,配置是一个普通的Python文件,我将所有超参数放置在其中,而对于Jupyter Notebook,它是在笔记本开始时定义的类,以保留所有超级参数。
class CFG :
debug = False
image_path = "C:/Moein/AI/Datasets/Flicker-8k/Images"
captions_path = "C:/Moein/AI/Datasets/Flicker-8k"
batch_size = 32
num_workers = 4
head_lr = 1e-3
image_encoder_lr = 1e-4
text_encoder_lr = 1e-5
weight_decay = 1e-3
patience = 1
factor = 0.8
epochs = 4
device = torch . device ( "cuda" if torch . cuda . is_available () else "cpu" )
model_name = 'resnet50'
image_embedding = 2048
text_encoder_model = "distilbert-base-uncased"
text_embedding = 768
text_tokenizer = "distilbert-base-uncased"
max_length = 200
pretrained = True # for both image encoder and text encoder
trainable = True # for both image encoder and text encoder
temperature = 1.0
# image size
size = 224
# for projection head; used for both image and text encoders
num_projection_layers = 1
projection_dim = 256
dropout = 0.1 class AvgMeter :
def __init__ ( self , name = "Metric" ):
self . name = name
self . reset ()
def reset ( self ):
self . avg , self . sum , self . count = [ 0 ] * 3
def update ( self , val , count = 1 ):
self . count += count
self . sum += val * count
self . avg = self . sum / self . count
def __repr__ ( self ):
text = f" { self . name } : { self . avg :.4f } "
return text
def get_lr ( optimizer ):
for param_group in optimizer . param_groups :
return param_group [ "lr" ]正如您在本文的图像中看到的那样,我们需要对图像及其描述文本进行编码。因此,数据集需要返回图像和文本。当然,我们不会将原始文本馈送到我们的文本编码器!我们将使用HuggingFace库中的Distilbert模型(比Bert小,但表现差不多)与我们的文本编码器一样;因此,我们需要用Diskilbert Tokenizer将句子(字幕)归为句子(字幕),然后将令牌ID(Input_IDS)和注意力掩盖喂入Distilbert。因此,数据集也需要照顾令牌化。在下面,您可以看到数据集的代码。在此下面,我将解释代码中最重要的事情。
在__init__中,我们会收到一个令牌对象,这实际上是一个拥抱面托犬。运行模型时将加载此令牌。我们正在将字幕填充并截断为指定的max_length。在__getItem__中,我们将首先加载一个编码的标题,该字典是带有键input_ids和coative_mask的字典,使张量从其值中取出,然后我们将加载相应的映像,转换和增强它(如果有任何!最后,我们仅出于可视化目的将标题的原始文本放入字典中的键“标题”。
我没有使用其他数据增强,但是如果您想提高模型的性能,则可以添加它们。
class CLIPDataset ( torch . utils . data . Dataset ):
def __init__ ( self , image_filenames , captions , tokenizer , transforms ):
"""
image_filenames and cpations must have the same length; so, if there are
multiple captions for each image, the image_filenames must have repetitive
file names
"""
self . image_filenames = image_filenames
self . captions = list ( captions )
self . encoded_captions = tokenizer (
list ( captions ), padding = True , truncation = True , max_length = CFG . max_length
)
self . transforms = transforms
def __getitem__ ( self , idx ):
item = {
key : torch . tensor ( values [ idx ])
for key , values in self . encoded_captions . items ()
}
image = cv2 . imread ( f" { CFG . image_path } / { self . image_filenames [ idx ] } " )
image = cv2 . cvtColor ( image , cv2 . COLOR_BGR2RGB )
image = self . transforms ( image = image )[ 'image' ]
item [ 'image' ] = torch . tensor ( image ). permute ( 2 , 0 , 1 ). float ()
item [ 'caption' ] = self . captions [ idx ]
return item
def __len__ ( self ):
return len ( self . captions )
def get_transforms ( mode = "train" ):
if mode == "train" :
return A . Compose (
[
A . Resize ( CFG . size , CFG . size , always_apply = True ),
A . Normalize ( max_pixel_value = 255.0 , always_apply = True ),
]
)
else :
return A . Compose (
[
A . Resize ( CFG . size , CFG . size , always_apply = True ),
A . Normalize ( max_pixel_value = 255.0 , always_apply = True ),
]
)图像编码器代码直截了当。我在此处使用Pytorch Image Models库(TIMM),这使许多不同的映像模型可从Resnets到EditiveNets等等。在这里,我们将使用Resnet50作为图像编码器。如果您不想安装新库,则可以轻松使用Torchvision库使用Resnets。
代码将每个图像编码为固定大小向量,并具有模型输出通道的大小(如果是Resnet50,则向量大小为2048 )。这是NN.Adaptiveavgpool2d()层之后的输出。
class ImageEncoder ( nn . Module ):
"""
Encode images to a fixed size vector
"""
def __init__ (
self , model_name = CFG . model_name , pretrained = CFG . pretrained , trainable = CFG . trainable
):
super (). __init__ ()
self . model = timm . create_model (
model_name , pretrained , num_classes = 0 , global_pool = "avg"
)
for p in self . model . parameters ():
p . requires_grad = trainable
def forward ( self , x ):
return self . model ( x )正如我之前提到的,我将使用Distilbert作为文本编码器。像更大的兄弟伯特一样,将添加两个特殊的令牌: CLS和SEP的实际输入令牌:标志着句子的开始和结尾。为了抓住句子的全部表示(如相关的Bert和Distilbert Papers所指出的那样),我们使用CLS令牌的最终表示形式,我们希望这种表示能够捕获句子的整体含义(标题)。以这种方式思考,它类似于我们对图像所做的事情,并将它们转换为固定尺寸的向量。
在Distilbert(以及Bert)的情况下,每个令牌的输出隐藏表示形式是一个尺寸768的向量。因此,整个标题将在大小为768的CLS令牌表示中编码。
class TextEncoder ( nn . Module ):
def __init__ ( self , model_name = CFG . text_encoder_model , pretrained = CFG . pretrained , trainable = CFG . trainable ):
super (). __init__ ()
if pretrained :
self . model = DistilBertModel . from_pretrained ( model_name )
else :
self . model = DistilBertModel ( config = DistilBertConfig ())
for p in self . model . parameters ():
p . requires_grad = trainable
# we are using the CLS token hidden representation as the sentence's embedding
self . target_token_idx = 0
def forward ( self , input_ids , attention_mask ):
output = self . model ( input_ids = input_ids , attention_mask = attention_mask )
last_hidden_state = output . last_hidden_state
return last_hidden_state [:, self . target_token_idx , :]我使用KERAS代码示例的投影头实现来在Pytorch中编写以下内容。现在,我们已经将图像和文本编码为固定尺寸的向量(图像为2048,文本为768),我们需要将它们(项目)带入一个新世界(!),图像和文本都具有相似的尺寸,以便能够比较它们并将其推开,并将其推开,并将其推开,并将非相关的图像和文本与匹配的那些匹配的图像和文本拉在一起。因此,以下代码将使2048和768维向量进入256(投射_dim)维度世界,我们可以在其中比较它们。
“ embedding_dim”是输入向量的大小(图像的2048,文本为768)和“ propption_dim”是输出向量的大小,我们的情况将为256。为了了解此部分的详细信息,您可以参考剪辑纸。
class ProjectionHead ( nn . Module ):
def __init__ (
self ,
embedding_dim ,
projection_dim = CFG . projection_dim ,
dropout = CFG . dropout
):
super (). __init__ ()
self . projection = nn . Linear ( embedding_dim , projection_dim )
self . gelu = nn . GELU ()
self . fc = nn . Linear ( projection_dim , projection_dim )
self . dropout = nn . Dropout ( dropout )
self . layer_norm = nn . LayerNorm ( projection_dim )
def forward ( self , x ):
projected = self . projection ( x )
x = self . gelu ( projected )
x = self . fc ( x )
x = self . dropout ( x )
x = x + projected
x = self . layer_norm ( x )
return x 这部分是所有乐趣发生的地方!我还将在这里谈论损失功能。我将一些代码从KERAS代码示例转换为Pytorch,以编写此部分。查看代码,然后阅读此代码块下面的说明。
在这里,我们将使用我们构建的先前模块来实现主模型。 __init__函数是自我解释的。在正向函数中,我们首先将图像和文本分别编码为固定尺寸向量(具有不同的维度)。之后,使用单独的投影模块,我们将它们投影到我之前谈到的那个共享世界(空间)。在这里,编码将变成相似的形状(在我们的情况下为256)。之后,我们将计算损失。再次,我建议阅读剪贴纸以使其变得更好,但我会尽力解释这部分。
在线性代数中,测量两个向量是否具有相似特征(它们彼此相似)的一种常见方法是计算其点产物(乘以匹配的条目并拿走它们的总和);如果最终数量很大,那么它们是相同的,如果它很小,则不会(相对而言)!
好的!我刚才说的是要了解这种损失功能的最重要的事情。让我们继续。我们谈到了两个向量,但是,我们在这里有什么?我们有image_embeddings,具有形状(batch_size,256)的矩阵和带状的text_embeddings(batch_size,256)。很容易!这意味着我们有两组向量,而不是两个向量。我们如何衡量类似的两组向量(两个矩阵)彼此之间的样子?同样,使用点产品(在这种情况下,Pytorch中的@运算符会执行点产品或矩阵乘法)。为了能够将这两个矩阵倍增,我们将第二个矩阵转换。好的,我们获得了一个带有形状的矩阵(batch_size,batch_size),我们将调用logits。 (在我们的情况下,温度等于1.0,因此,它不会有所作为。您可以使用它并查看它的差异。还请查看纸张以查看为什么它在这里!)。
我希望你仍然和我在一起!如果没有,请检查代码并检查其形状。现在我们有了逻辑,我们需要目标。我需要说,有一种更直接的方法来获得目标,但我必须为我们的情况做到这一点(我将在下一段中谈论为什么)。
让我们考虑一下我们希望该模型学习的内容:我们希望它为给定的图像学习“相似的表示(向量)”和描述它的标题。这意味着我们要么给它一个图像,要么是描述它的文本,我们希望它为两者产生相同的256个大小的向量。
class CLIPModel ( nn . Module ):
def __init__ (
self ,
temperature = CFG . temperature ,
image_embedding = CFG . image_embedding ,
text_embedding = CFG . text_embedding ,
):
super (). __init__ ()
self . image_encoder = ImageEncoder ()
self . text_encoder = TextEncoder ()
self . image_projection = ProjectionHead ( embedding_dim = image_embedding )
self . text_projection = ProjectionHead ( embedding_dim = text_embedding )
self . temperature = temperature
def forward ( self , batch ):
# Getting Image and Text Features
image_features = self . image_encoder ( batch [ "image" ])
text_features = self . text_encoder (
input_ids = batch [ "input_ids" ], attention_mask = batch [ "attention_mask" ]
)
# Getting Image and Text Embeddings (with same dimension)
image_embeddings = self . image_projection ( image_features )
text_embeddings = self . text_projection ( text_features )
# Calculating the Loss
logits = ( text_embeddings @ image_embeddings . T ) / self . temperature
images_similarity = image_embeddings @ image_embeddings . T
texts_similarity = text_embeddings @ text_embeddings . T
targets = F . softmax (
( images_similarity + texts_similarity ) / 2 * self . temperature , dim = - 1
)
texts_loss = cross_entropy ( logits , targets , reduction = 'none' )
images_loss = cross_entropy ( logits . T , targets . T , reduction = 'none' )
loss = ( images_loss + texts_loss ) / 2.0 # shape: (batch_size)
return loss . mean ()
def cross_entropy ( preds , targets , reduction = 'none' ):
log_softmax = nn . LogSoftmax ( dim = - 1 )
loss = ( - targets * log_softmax ( preds )). sum ( 1 )
if reduction == "none" :
return loss
elif reduction == "mean" :
return loss . mean ()因此,在最佳情况下,text_embeddings和image_embedding矩阵应该相同,因为它们正在描述类似的内容。现在让我们考虑一下:如果发生这种情况,logits矩阵会是什么样?让我们看看一个简单的例子!
# A simple Example
batch_size = 4
dim = 256
embeddings = torch . randn ( batch_size , dim )
out = embeddings @ embeddings . T
print ( F . softmax ( out , dim = - 1 ))因此,在最好的情况下,logits将是一个矩阵,如果我们采用其SoftMax,在对角线中将具有1.0级(一个以精美的单词来称呼它的身份矩阵!)。由于损失函数的工作是使模型的预测与目标相似(至少在大多数情况下!),我们希望这样的矩阵作为目标。这就是为什么我们在上面的代码块中计算images_simurility和texts_simarlity矩阵的原因。
现在,我们已经拥有目标矩阵,我们将使用简单的横熵来计算实际损失。我已经将跨熵的完整矩阵形式写为一个函数,您可以在代码块的底部看到。好的!我们完成了!这不是简单吗?好吧,您可以忽略下一个段落,但是如果您很好奇,那么其中有一个重要的注释。
这就是为什么我没有使用更简单的方法:我需要承认有一种更简单的方法来计算Pytorch中的这种损失;通过这样做:nn.crossentropyloss()(logits,torch.arange(batch_size))。为什么我在这里不使用它?出于两个原因。 1-我们正在使用的数据集具有单个图像的多个字幕;因此,有可能在批处理中存在两个具有相似标题的相同标题的相同图像(很少见,但可能发生)。用这种更轻松的方法损失将忽略这种可能性,并且该模型学会了将两个实际上相同的表示形式拉开(假设它们不同)。显然,我们不希望发生这种情况,因此我以照顾这些边缘情况的方式计算了整个目标矩阵。 2-按照我的方式这样做,使我更好地了解了此损失功能中正在发生的事情;因此,我认为这也可以为您提供更好的直觉!
以下是一些功能,可以帮助我们加载火车和有效的数据加载器,我们的模型,然后训练和评估我们的模型。这里没有太多发生。只是简单的训练循环和实用程序功能
def make_train_valid_dfs ():
dataframe = pd . read_csv ( f" { CFG . captions_path } /captions.csv" )
max_id = dataframe [ "id" ]. max () + 1 if not CFG . debug else 100
image_ids = np . arange ( 0 , max_id )
np . random . seed ( 42 )
valid_ids = np . random . choice (
image_ids , size = int ( 0.2 * len ( image_ids )), replace = False
)
train_ids = [ id_ for id_ in image_ids if id_ not in valid_ids ]
train_dataframe = dataframe [ dataframe [ "id" ]. isin ( train_ids )]. reset_index ( drop = True )
valid_dataframe = dataframe [ dataframe [ "id" ]. isin ( valid_ids )]. reset_index ( drop = True )
return train_dataframe , valid_dataframe
def build_loaders ( dataframe , tokenizer , mode ):
transforms = get_transforms ( mode = mode )
dataset = CLIPDataset (
dataframe [ "image" ]. values ,
dataframe [ "caption" ]. values ,
tokenizer = tokenizer ,
transforms = transforms ,
)
dataloader = torch . utils . data . DataLoader (
dataset ,
batch_size = CFG . batch_size ,
num_workers = CFG . num_workers ,
shuffle = True if mode == "train" else False ,
)
return dataloader这是训练我们的模型的方便功能。这里没有太多发生。只需加载批处理,将它们馈入模型,然后踩下优化器和LR_SCHEDULER。
def train_epoch ( model , train_loader , optimizer , lr_scheduler , step ):
loss_meter = AvgMeter ()
tqdm_object = tqdm ( train_loader , total = len ( train_loader ))
for batch in tqdm_object :
batch = { k : v . to ( CFG . device ) for k , v in batch . items () if k != "caption" }
loss = model ( batch )
optimizer . zero_grad ()
loss . backward ()
optimizer . step ()
if step == "batch" :
lr_scheduler . step ()
count = batch [ "image" ]. size ( 0 )
loss_meter . update ( loss . item (), count )
tqdm_object . set_postfix ( train_loss = loss_meter . avg , lr = get_lr ( optimizer ))
return loss_meter
def valid_epoch ( model , valid_loader ):
loss_meter = AvgMeter ()
tqdm_object = tqdm ( valid_loader , total = len ( valid_loader ))
for batch in tqdm_object :
batch = { k : v . to ( CFG . device ) for k , v in batch . items () if k != "caption" }
loss = model ( batch )
count = batch [ "image" ]. size ( 0 )
loss_meter . update ( loss . item (), count )
tqdm_object . set_postfix ( valid_loss = loss_meter . avg )
return loss_meter
def main ():
train_df , valid_df = make_train_valid_dfs ()
tokenizer = DistilBertTokenizer . from_pretrained ( CFG . text_tokenizer )
train_loader = build_loaders ( train_df , tokenizer , mode = "train" )
valid_loader = build_loaders ( valid_df , tokenizer , mode = "valid" )
model = CLIPModel (). to ( CFG . device )
params = [
{ "params" : model . image_encoder . parameters (), "lr" : CFG . image_encoder_lr },
{ "params" : model . text_encoder . parameters (), "lr" : CFG . text_encoder_lr },
{ "params" : itertools . chain (
model . image_projection . parameters (), model . text_projection . parameters ()
), "lr" : CFG . head_lr , "weight_decay" : CFG . weight_decay }
]
optimizer = torch . optim . AdamW ( params , weight_decay = 0. )
lr_scheduler = torch . optim . lr_scheduler . ReduceLROnPlateau (
optimizer , mode = "min" , patience = CFG . patience , factor = CFG . factor
)
step = "epoch"
best_loss = float ( 'inf' )
for epoch in range ( CFG . epochs ):
print ( f"Epoch: { epoch + 1 } " )
model . train ()
train_loss = train_epoch ( model , train_loader , optimizer , lr_scheduler , step )
model . eval ()
with torch . no_grad ():
valid_loss = valid_epoch ( model , valid_loader )
if valid_loss . avg < best_loss :
best_loss = valid_loss . avg
torch . save ( model . state_dict (), "best.pt" )
print ( "Saved Best Model!" )
lr_scheduler . step ( valid_loss . avg )运行下一个单元格启动训练模型。将内核处于GPU模式。每个时代都应在GPU上花费大约24分钟(即使一个时代就足够了!)。可能需要一分钟的时间才能真正开始训练,因为我们将在火车和有效数据集中编码所有字幕,因此请不要停止它!每件事都很好。
main ()好的!我们已经完成了训练模型。现在,我们需要进行推断,在我们的情况下,这将为模型提供一条文本,并希望它从看不见的验证(或测试)集中检索最相关的图像。
在此功能中,我们正在加载训练后保存的模型,以验证设置为IT图像馈送图像,并以Shape(valif_set_size,256)和模型本身返回image_embeddings。
def get_image_embeddings ( valid_df , model_path ):
tokenizer = DistilBertTokenizer . from_pretrained ( CFG . text_tokenizer )
valid_loader = build_loaders ( valid_df , tokenizer , mode = "valid" )
model = CLIPModel (). to ( CFG . device )
model . load_state_dict ( torch . load ( model_path , map_location = CFG . device ))
model . eval ()
valid_image_embeddings = []
with torch . no_grad ():
for batch in tqdm ( valid_loader ):
image_features = model . image_encoder ( batch [ "image" ]. to ( CFG . device ))
image_embeddings = model . image_projection ( image_features )
valid_image_embeddings . append ( image_embeddings )
return model , torch . cat ( valid_image_embeddings ) _ , valid_df = make_train_valid_dfs ()
model , image_embeddings = get_image_embeddings ( valid_df , "best.pt" )此功能执行我们希望模型能够能够有能力的最终任务:它获取模型,image_embeddings和文本查询。它将显示验证集中最相关的图像!这不是很棒吗?让我们看看它的性能毕竟!
def find_matches ( model , image_embeddings , query , image_filenames , n = 9 ):
tokenizer = DistilBertTokenizer . from_pretrained ( CFG . text_tokenizer )
encoded_query = tokenizer ([ query ])
batch = {
key : torch . tensor ( values ). to ( CFG . device )
for key , values in encoded_query . items ()
}
with torch . no_grad ():
text_features = model . text_encoder (
input_ids = batch [ "input_ids" ], attention_mask = batch [ "attention_mask" ]
)
text_embeddings = model . text_projection ( text_features )
image_embeddings_n = F . normalize ( image_embeddings , p = 2 , dim = - 1 )
text_embeddings_n = F . normalize ( text_embeddings , p = 2 , dim = - 1 )
dot_similarity = text_embeddings_n @ image_embeddings_n . T
values , indices = torch . topk ( dot_similarity . squeeze ( 0 ), n * 5 )
matches = [ image_filenames [ idx ] for idx in indices [:: 5 ]]
_ , axes = plt . subplots ( 3 , 3 , figsize = ( 10 , 10 ))
for match , ax in zip ( matches , axes . flatten ()):
image = cv2 . imread ( f" { CFG . image_path } / { match } " )
image = cv2 . cvtColor ( image , cv2 . COLOR_BGR2RGB )
ax . imshow ( image )
ax . axis ( "off" )
plt . show ()这就是我们使用此功能的方式。结果:结果:
find_matches ( model ,
image_embeddings ,
query = "a group of people dancing in a party" ,
image_filenames = valid_df [ 'image' ]. values ,
n = 9 )
希望您喜欢这篇文章。对我来说,实施本文是一个非常有趣的经历。我要感谢Khalid Salama提供了他提供的出色的Keras代码示例,这激发了我在Pytorch中写类似的东西。