1.简介

“缩小闭源和开源视频基础模型之间的差距,加速社区探索。”——混元团队

-

Hunyuan-Video是由腾讯推出的一款高质量的中文通用视频生成模型,它以其卓越的性能和开源的特性,成为了目前开源视频生成基座模型中的佼佼者。该模型支持中文输入提示(Prompt),采用了图像-视频联合训练策略,并通过一系列精细的数据过滤技术,确保了视频的技术质量和审美吸引力。

Hunyuan-Video的开源,为视频内容创作者、研究人员和开发者提供了一个强大的工具,以实现更高效、更高质量的视频生成,推动了AI视频技术的发展。

-

官方网站:https://aivideo.hunyuan.tencent.com/

论文地址:HunyuanVideo/assets/hunyuanvideo.pdf at main · Tencent/HunyuanVideo · GitHub

github地址:GitHub - Tencent/HunyuanVideo: HunyuanVideo: A Systematic Framework For Large Video Generation Model Training

hugging face权重:https://huggingface.co/tencent/HunyuanVideo/tree/main

效果体验地址: 腾讯混元文生视频

-

-

2.效果展示

美女,御姐,精致五官,高颜值,银色长发,红色古装,伤感,相思,花瓣飘落

真实画面,电影镜头。一枚巨大的炸弹在高空爆炸,火焰和碎片四溅。摄像机从低角度向上拍摄炸弹爆炸的瞬间,火光照亮了战场的废墟。一个小男孩士兵的脸在爆炸的光芒中显得惊恐万分,尘土和火焰在他身后交织,场面令人震惊。

四人在下潜至烟囱区时发现深度远超预期,装备不足以支持他们继续下潜,因此决定返回水面。深海洞穴,四名潜水员。

一只戴着眼镜的熊猫在大学讲堂上讲课,切换到学生,各种动物学生在专注地学习,写实风格 Natural lighting  Wide shot  Lively  Modern Style  Handheld Static  Cinematic  High detailed

-

-

3.论文详解

介绍

与图像生成模型社区相反,开源和闭源视频生成模型之间出现了显著的差距。闭源模型往往会隐藏其实现方法,这严重限制了公共社区算法创新的潜力。为了缩小现有的差距,提高开源社区的能力,这篇报告介绍了开源基础视频生成模型,Hunyuan-Video。

作者对hunyuan-video与全球领先的视频生成模型进行了全面的比较,包括Gen-3和Luma 1.6以及中国三个表现最好的商业模型。结果表明,Hunyuan-Video达到了最高的整体满意度,特别是在运动动力学方面。

-

数据

数据采集

作者使用图像-视频联合训练策略。其中视频被精心分为五个不同的组,而图像被分为两组,每组都是根据各自训练过程的具体要求而定制的。

原始数据池最初包括广泛素材的视频,包括人物,动物,植物,景观,车辆,物体,建筑物和动画。每个视频的采集都有一组基本阈值,包括最低持续时间要求。此外,数据的子集是基于更严格的标准收集的,例如空间质量,遵守特定的长宽比,以及构图,颜色和曝光的专业标准。这些严格的标准确保收集的视频具有技术质量和美学吸引力。

-

数据过滤

作者采用了一系列技术来预处理原始数据。

  1. 首先,作者利用PySceneDetect将原始视频拆分为单镜头视频剪辑。
  2. 接下来,作者使用OpenCV中的拉普拉斯算子来识别一个清晰的帧,作为每个视频剪辑的起始帧。
  3. 使用内部VideoCLIP模型,我们计算这些视频剪辑的Embedding。这些Embedding有两个目的:
    1. 基于Embedding的余弦距离对相似片段进行重复数据删除;
    2. 我们应用k-means 来获得10000个概念质心,用于重新排序和平衡。

为了不断增强视频美学、运动和概念范围,作者实现了一个分层数据过滤管道来构建训练数据集。

  • 使用Dover从美学和技术角度评估视频剪辑的视觉美学
  • 此外,作者还训练了一个模型来确定清晰度并消除具有视觉模糊的视频片段
  • 通过使用估计的光流预测视频的运动速度,过滤掉静态或慢动作视频。
  • 将PySceneDetect和Transnet v2的结果结合以获得场景边界信息
  • 利用内部的OCR模型来删除带有过多文本的视频片段,以及定位和裁剪字幕。
  • 作者还开发了类似YOLOX的视觉模型来检测和删除一些遮挡或敏感信息,例如水印、边框和徽标。
  • 为了评估这些过滤器的有效性,我们使用较小的hunyuan-video模型进行简单的实验并观察性能变化。

作者针对视频数据的分层数据过滤管道产生了五个训练数据集,对应于五个训练阶段。这些数据集(除了最后一个微调数据集)通过逐步提高上述过滤器的阈值来管理。随着视频空间分辨率逐渐增加,作者对过滤器的筛选级别也逐级变严。

为了提高模型在最后阶段的性能(第4.7节),作者构建了一个微调数据集,包括1000万个样本。该数据集通过人工注释精心策划,由具有复杂运动细节的视觉上吸引人的视频剪辑组成。

-

数据标注

图像字幕。字幕的精确性和全面性对提高生成式模型的快速跟踪能力和输出质量起着至关重要的作用。大多数先前的工作集中在提供简短的字幕或密集的字幕。它们都存在信息不完整、话语冗余和不准确等问题。

字幕注释器被分配的任务是识别表现出高视觉美学和引人注目的内容运动的视频剪辑。每个视频片段基于两个视角进行评估:(i)分解的美学视图,包括色彩和谐、照明、对象强调和空间布局;(ii)分解的运动视图,包括运动速度、动作完整性和运动模糊。

为了实现更全面、更高信息密度和更高准确度的字幕,我们开发并实施了一个内部视觉语言模型(VLM),旨在为图像和视频生成结构化字幕。这些结构化的标题,以JSON格式保存,从不同的角度提供多维描述信息,包括:

  1. 简短描述:捕获场景的主要内容。
  2. 密集描述:详细描述场景的内容,其中特别包括与视觉内容相结合的场景过渡和摄像机移动,例如摄像机跟随某个主题。
  3. 背景:描述受试者所处的环境。
  4. 风格:描述视频的风格,如纪录片、电影、写实或科幻。
  5. 镜头类型:标识突出显示或强调特定视觉内容的视频快照类型,如航拍、特写、中摄或远景。
  6. 照明:描述视频的照明条件。
  7. 氛围:传达视频的氛围,如温馨、紧张或神秘。

摄影机移动类型。我们还训练了一个相机运动分类器,能够预测14种不同的相机运动类型,包括放大,缩小,向上摇摄,向下摇摄,向左摇摄,向右摇摄,向上倾斜,向下倾斜,向左倾斜,向右倾斜,向左左右,静态拍摄和手持拍摄。相机运动类型的高置信度预测被集成到JSON格式的结构化字幕中,以实现生成模型的相机运动控制能力。

-

模型结构

3D变分自动编码器

作者训练了一个3D-VAE来将像素空间的视频和图像压缩到一个紧凑的潜在空间中。为了同时处理视频和图像,作者使用了CausalConv3D。对于一个形状为(T + 1) × 3 × H × W的视频,作者的3D VAE将其压缩成形状为(\frac{T}{ c_t} + 1) \times C \times (\frac{H}{ c_s}) \times (\frac{W}{ c_s})的潜在特征。其中,ct = 4,cs = 8,C = 16。

对于这里为什么是T+1,笔者已经在CogVideoX和pyramid-flow里面说过了,这里是对视频第一帧进行单独编码,第一帧作为后续P帧的参考帧,其质量直接影响到同一组中以后各帧的质量。因此,单独编码第一帧可以确保视频序列的质量和准确性。

参考:CogVideoX-5b及I2V详解(包含代码、论文详解)-CSDN博客

参考:Pyramid Flow:北大开源的视频生成模型-CSDN博客 

训练:作者从头开始训练3DVAE,而非常见的预训练后微调。为了平衡视频和图像的重建质量,作者以4:1的比例混合视频和图像数据。除了常规使用的L1重建损失和KL损失Lkl之外,作者还结合了感知损失Llpips和GAN对抗损失Ladv以提高重建质量。

Loss = L_1+0.1L_{lpips}+0.05L_{adv}+10^{-6}L_{kl}

在训练过程中,作者采用从低分辨率的短视频逐渐变化到高分辨率的长视频的策略。为了改善高运动视频的重建,作者从1到8的范围内随机选择一个采样间隔,以便在视频剪辑中均匀地对帧进行采样。

推理:在单个GPU上编码和解码高分辨率长视频可能导致内存不足(OOM)错误。为了解决这个问题,作者使用空间-时间平铺(Tiling)策略,将输入视频沿空间和时间维度分割成重叠的平铺。每个平铺分别编码/解码,然后输出结果拼接在一起。对于重叠区域,使用线性组合进行混合。这种平铺策略使我们能够在单个GPU上编码/解码任意分辨率和时长的视频。

作者观察到,在推理过程中直接使用平铺策略可能会由于训练和推理之间的不一致而导致可见的伪影。为了解决这个问题,我们引入了一个额外的微调阶段,在训练过程中随机启用/禁用平铺策略。这确保了模型与平铺和非平铺策略兼容,保持了训练和推理之间的一致性。

-

DiT

本节介绍了HunyuanVideo中的Transformer设计,它采用了统一的全注意力机制,主要原因有三:

  1. 它比分割的时空注意力表现出更优越的性能。
  2. 它支持图像和视频的统一生成,简化了训练过程并提高了模型的可扩展性。
  3. 它更有效地利用了现有的大型语言模型(LLM)相关的加速能力,提高了训练和推理效率。模型结构如下图所示。

-

输入:对于给定的视频-文本对:

  • 对于视频分支,输入首先被压缩成形状为T × C × H × W的潜在空间。为了统一输入处理,作者将图像视为单帧视频。然后使用内核大小为k_t\times k_h\times k_w的3D卷积分割成形状为\frac{T}{k_t}\cdot \frac{H}{k_h}\cdot \frac{W}{k_w}的1D token序列。
  • 对于文本分支,首先使用大语言模型将文本编码成一系列包含细粒度语义信息的embedding。同时,使用CLIP模型提取包含全局信息的文本向量,这种表示随后在维度上扩展,并添加到时间步,然后嵌入模型。

模型设计:为了有效地整合文本和视觉信息,作者使用了“双流到单流”混合模型设计的策略。

  • 双流阶段,视频和文本token通过多个Transformer块独立处理,使每种模态都能在无干扰的情况下学习自己的适当调制机制。
  • 单流阶段,将视频和文本token连接起来,并将它们输入后续的Transformer块以实现有效的多模态信息融合。这种设计捕捉了视觉和语义信息之间复杂的交互,增强了整体模型性能。

位置Embedding:为了支持多分辨率、多宽高比和不同持续时间的生成,作者将RoPE扩展到三个维度。具体来说,作者分别为时间(T)、高度(H)和宽度(W)的坐标计算旋转频率矩阵。然后我们将查询和键的特征通道分割成三个部分(dt,dh,dw),将每个部分乘以相应的坐标频率并连接这些部分。这个过程产生了位置感知的查询和键嵌入,用于注意力计算。

 这里可以参考CogVideoX的3D-RoPE:CogVideoX-5b及I2V详解(包含代码、论文详解)-CSDN博客

-

文本编码器

在文本到图像和文本到视频等生成任务中,文本编码器通过在潜在空间中提供指导信息发挥着关键作用。

  1. 作者遵循现有研究方案引入了大语言模型作为文本特征提取器以增强文本特征。在每种设置下,MLLMs都比传统文本编码器表现出更优越的性能。
  2. 此外,CLIP文本特征作为文本信息的摘要也很有价值。作者采用CLIP-Large文本特征的最后一个非填充令牌作为全局指导,整合到双流和单流DiT块中

-

模型预训练

损失函数

作者使用Flow Matching进行模型训练,并将训练过程分为多个阶段。首先在256px和512px的图像上预训练我们的模型,然后从256px到960px的图像和视频上进行联合训练。

Flow Matching通过一系列变量转换将复杂的概率分布转换为简单的概率分布,并通过对数转换生成新数据样本。

在训练过程中,给定训练集中的图像或视频潜在表示x1,我们首先从对数正态分布[21]中采样t ∈ [0, 1]并初始化噪声x0 ∼ N(0, I),按照高斯分布。训练样本xt然后使用线性插值方法构建。模型被训练以预测速度ut = dxt/dt,指导样本xt朝向样本x1。通过最小化预测速度vt和真实速度ut之间的均方误差来优化模型参数,表示为损失函数:

L_{generation} = E_{t,x_0,x_1} \left \| v_t -u_t \right \|^2

在推理过程中,初始采样噪声样本x0 ∼ N(0, I)。然后使用一阶欧拉常微分方程(ODE)求解器通过对模型的dxt/dt估计值进行积分来计算x1。该过程最终生成最终样本x1。

-

图像预训练

作者引入了一个两阶段的渐进式图像预训练策略,作为视频训练的热身。

  1. 图像阶段1(256px训练):模型首先使用低分辨率256px图像进行预训练。具体来说,作者使用不同的宽高比训练,这有助于模型学习在不牺牲文本图像对齐的情况下生成具有广泛宽高比的图像。同时,使用低分辨率样本进行预训练使模型能够从更多的样本中学习低频概念。
  2. 图像阶段2(混合比例训练):作者发现在512px图像上微调后的模型在256px图像生成上的性能会严重下降,这可能影响到随后基于256px视频的视频预训练。因此,作者进行了混合比例训练其中每个训练全局批次包括两个或更多比例的多宽高比集合。每个比例都有一个锚定大小,然后基于锚定大小构建多宽高比 集合。作者在两个比例的数据集上训练模型,学习更高分辨率的图像,同时保持对低分辨率的能力。作者还引入了动态批次大小,用于具有不同图像比例的微批次,最大化GPU内存和计算利用率。

-

视频图像联合训练

数据过滤过程之后,视频具有不同的宽高比和持续时间。作者创建了BT个持续时间 集合和BAR个宽高比 集合,总共有BT × BAR个 集合。由于不同 集合中的令牌数量不同,作者为每个 集合分配了一个最大批次大小,以防止内存不足(OOM)错误,以优化GPU资源利用。在训练之前,所有数据都被分配到最近的 集合。在训练过程中,每个等级随机预提取 集合数据。这种随机选择确保模型在每一步都训练不同大小的数据,这有助于通过避免仅在单一大小上训练的限制来保持模型泛化。

-

渐进式视频-图像联合训练

直接从文本生成高质量、长时序的视频序列通常会导致模型收敛困难和次优结果。因此,渐进式学习已成为训练文本到视频模型的广泛采用策略。

  1. 低分辨率、短视频阶段。模型建立了文本和视觉内容之间的基本映射,确保短期动作的一致性和连贯性。

  2. 低分辨率、长视频阶段。模型学习更复杂的时间动态和场景变化,确保在更长时间内保持时间和空间的一致性。

  3. 高分辨率、长视频阶段。模型提高了视频分辨率和细节质量,同时保持时间连贯性,并管理复杂的时间动态。

此外,在每个阶段,我们都以不同的比例纳入图像进行视频-图像联合训练。这种方法解决了高质量视频数据的稀缺性,使模型能够学习更广泛和多样化的世界知识。它还有效地避免了由于视频和图像数据之间的分布差异而导致的图像空间语义的灾难性遗忘。

-

Prompt重写

为了解决用户提供的提示在语言风格和长度上的变化,作者使用Hunyuan-Large模型作为提示重写模型,以适应原始用户提示到模型偏好的提示。这个提示重写模块的关键功能如下:

  • 多语言输入适应:该模块旨在处理和理解各种语言的用户提示,确保保留意义和上下文。

  • 标准化提示结构:该模块重述提示,使其符合标准化的信息架构,类似于训练中的字幕。

  • 简化复杂术语:该模块将复杂的用户措辞简化为更直接的表达方式,同时保持用户的原始意图。

-

高性能模型微调

作者仔细选择了整个数据集中的四个特定子集进行微调。这些子集首先使用自动化数据过滤技术进行筛选,然后进行手动审查。然后使用这个数据集进行微调。

-

-

应用

视频到音频(V2A)

视频到音频(V2A)模块旨在通过整合同步的声音效果和上下文适当的背景音乐来增强生成的视频内容。

数据:作者构建了一个由视频-文本对组成的视频数据集。然而,并非该数据集中的所有数据都适合用于训练V2A模型。例如,一些视频缺乏音频流,其他视频包含大量的画外音内容,或者它们的环境音轨已被删除并替换为不相关的元素。为了应对这些挑战并确保数据质量,作者设计了一个专为V2A量身定制的强大数据过滤管道。

  1. 过滤掉没有音频流的视频或静音率超过80%的视频。
  2. 采用帧级音频检测模型来检测音频流中的语音,音乐和一般声音。基于这种分析,我们将数据分为四个不同的类别:纯声音,语音,音乐和纯音乐。
  3. 为了优先考虑高质量的数据,我们训练了模型来计算视听一致性得分,该得分量化了每个视频的视觉和听觉分量之间的对齐。使用这个评分系统结合音频类别标签,我们系统地从每个类别中抽取部分数据,从原始数据集中保留大约250,000小时用于预训练。
  4. 对于监督微调阶段,我们进一步完善我们的选择,策划数百万个高质量剪辑的子集(80,000小时)。

-

模型结构

  • 该模型运行在一个经过梅尔频谱图训练的变分自动编码器(VAE)上。音频波形首先被转换成2D梅尔频谱图表示。随后使用预训练的VAE将该频谱图编码到潜在空间中
  • 对于特征提取,我们利用预训练的CLIP和T5编码器分别独立地提取视觉和文本特征。这些特征随后使用独立的线性投影投影到DiT兼容的潜在空间中,然后进行SwiGLU激活
  • 为了有效地整合多模态信息,我们采用了堆叠的三流Transformer块,独立处理视觉,音频和文本模态。这些之后是单流Transformer块,以确保跨模态的无缝融合和对齐。这种设计增强了音频-视频和音频-文本表示之间的对齐,促进了多模态一致性的改善。
  • 最后,使用预先训练的HifiGAN声码器将梅尔频谱图转换回音频波形。该框架确保了音频信号的高保真度重建,同时保持了强大的多模态对齐。

-

图生视频

图像到视频(I2V)任务是视频生成任务中的常见应用。它通常意味着给定一个图像和一个字幕,模型使用该图像作为第一帧来生成与字幕匹配的视频

为了引入图像条件I,我们将I视为视频的第一帧,并应用零填充来创建T ×C ×H ×W张量Io。此外,我们采用维度为T × 1 ×H ×W的二进制掩码m,其中第一个时间位置设置为1,所有其他位置设置为零。然后,潜在X、张量Io和掩码m沿通道维度被连接沿着以形成模型的输入。

微调

  • 作者对200万个肖像视频进行了I2V模型的监督微调,以增强人体运动和整体美学。还应用人脸和身体检测器来过滤超过五个人的训练视频。我们还删除了主要主题较小的视频。最后,对剩余的视频进行人工检查,以获得最终的高质量人像训练数据集。
  • 在训练方面,我们采用渐进式微调策略,逐步解冻各个层的模型参数,同时在微调期间保持其余参数冻结。这种方法允许模型在肖像领域实现高性能,而不会损害其固有的泛化能力,保证在自然景观,动物和植物领域的出色性能。
  • 此外,我们的模型还支持视频插值使用的第一个和最后一个帧作为条件。
  • 我们在训练过程中以一定的概率随机丢弃文本条件,以提高模型的性能。

-

音频/姿势/表情控制

hunyuan-video在多个方面赋能可控的化身动画。它允许使用显式驱动信号(例如,语音信号、表情模板和姿势模板)。此外,它还集成了使用文本提示的隐式驱动范式。

为了保持严格的外观一致性,作者通过插入参考图像的潜像作为强指导来修改浑源视频架构。如图(B,C)所示,我们使用3DVAE对参考图像进行编码,得到z^{ref} \in R^{1\times c\times h\times w},其中c = 16。然后,我们沿着时间维度重复t次,并在信道维度上与zt级联,以得到修改的噪声输入\hat{z_{t}} \in R^{t\times2 c\times h\times w}。为了实现可控的动画,采用了各种适配器。我们在下面描述它们。

基于音频信号生成视频

我们的模型可以自适应地预测数字人的面部表情和姿势动作信息。这允许被驱动的角色带着情感和表情说话,增强了数字人的表现力和真实感。

如图所示,对于单个音频信号驱动部分,音频经过耳语特征提取模块得到音频特征,然后以交叉注意的方式注入到主网络中。需要注意的是,注入过程将乘以面罩来控制音频的效果区域。在增强头肩控制能力的同时,也会大大降低身体变形的概率。

为了获得更生动的头部运动,我们引入头部姿势运动参数和表情运动参数,并以Embedding方式添加到时间步长中。在训练过程中,头部运动参数由鼻尖关键点序列的方差给出,表情参数由面部关键点序列的方差给出。

-

Pose-Driven

我们可以使用姿势模板明确地控制数字角色的身体动作。

我们使用Dwpose从任何源视频中检测骨架视频,并使用3DVAE将其转换为z_{pose}的潜在空间。我们认为,这简化了微调过程,因为输入和驱动视频都是图像表示,并与共享VAE编码,从而产生相同的潜在空间。然后,我们通过逐元素添加将驱动信号注入到模型中,作为\hat{z}_t + z_{pose}。注意,\hat{z}_t包含参考图像的外观信息。我们使用预训练T2V权重的全参数微调作为初始化

-

Expression-Driven

我们还可以使用隐式表情表示来控制数字角色的面部表情。

我们使用隐式表示作为其ID和表达式解纠缠能力的驱动信号。在这项工作中,我们使用VASA 作为表情提取器。如图(c)所示,我们采用轻量级表达式编码器将表情表示转换为潜在空间中的令牌序列,如z_{exp} \in R^{t\times n\times c},其中n是每帧令牌的数量。通常,我们设置n = 16。与姿势条件不同,我们使用交叉注意注入z_{exp},因为\hat{z}z_{exp}在空间方面并不自然对齐。我们在每K个双流和单流DiT层中添加交叉注意层Attnexp(q,k,v)来注入表情潜在空间将第i个DiT层之后的隐藏状态表示为h_{i},表达式z_{exp}h_{i}的注入可以导出为h_i+Attn_{exp}(h_i,z_{exp},z_{exp})*M_{face},其中M_{face}是引导z_{exp}应该应用于何处的面部区域掩码,并且代表逐元素乘法。

-

Hybrid Condition Driven

结合姿态和表情驱动的策略,我们可以得到混合控制方法。在该场景中,身体运动由显式骨骼姿态序列控制,面部表情由隐式表情表示确定。作者共同微调T2V模块,并以端到端的方式添加模块。在推理过程中,身体运动和面部运动可以由单独的驱动信号控制,赋予更丰富的可编辑性。

-

-

4.代码详解

环境安装

下载源码,按照以下步骤配置conda环境。

需要注意的是:flash attention跟你使用的torch版本有关,如果flash attention报错,请确认安装的版本是否符合torch版本。

# 1. Prepare conda environment
conda env create -f environment.yml

# 2. Activate the environment
conda activate HunyuanVideo

# 3. Install pip dependencies
python -m pip install -r requirements.txt

# 4. Install flash attention v2 for acceleration (requires CUDA 11.8 or above)
python -m pip install git+https://github.com/Dao-AILab/flash-attention.git@v2.5.9.post1

当然,你也可以直接使用官方提供的docker

# 1. Use the following link to download the docker image tar file (For CUDA 12).
wget https://aivideo.hunyuan.tencent.com/download/HunyuanVideo/hunyuan_video_cu12.tar

# 2. Import the docker tar file and show the image meta information (For CUDA 12).
docker load -i hunyuan_video.tar

docker image ls

# 3. Run the container based on the image
docker run -itd --gpus all --init --net=host --uts=host --ipc=host --name hunyuanvideo --security-opt=seccomp=unconfined --ulimit=stack=67108864 --ulimit=memlock=-1 --privileged  docker_image_tag

-

最后别忘了下载权重:

 hugging face权重:https://huggingface.co/tencent/HunyuanVideo/tree/main

-

下载量化版本

量化版本:https://huggingface.co/Kijai/HunyuanVideo_comfy/tree/main

  • 包括fp8本体和vae模型(一共4个,只需选其中2个适当的下载即可)
  • 注意按需下载,建议都下载体积小的那个
  • 下载成功后,分别放入 models/diffusion_models 目录 和 models/vae 目录即可

接着下载LLM:https://huggingface.co/Kijai/llava-llama-3-8b-text-encoder-tokenizer

把这个模型放入 models/LLM 目录

下载 clip 模型https://huggingface.co/openai/clip-vit-large-patch14

把这个模型放到models/clip目录 

-

使用

cd HunyuanVideo

python3 sample_video.py \
    --video-size 720 1280 \
    --video-length 129 \
    --infer-steps 50 \
    --prompt "A cat walks on the grass, realistic style." \
    --flow-reverse \
    --use-cpu-offload \
    --save-path ./results

其他参数

ArgumentDefaultDescription
--promptNone视频生成提示词
--video-size720 1280视频尺寸,最小256p
--video-length129时长(帧)
--infer-steps50采样步数
--embedded-cfg-scale6.0Embeded Classifier free guidance scale,自由引导比例
--flow-shift7.0Shift factor for flow matching schedulers,移位系数
--flow-reverseFalse如果设置为True,则从 t=1 -> t=0 开始学习/采样
--seedNone生成视频的随机数种子,如果不设置会随机生成一个
--use-cpu-offloadFalse使用 CPU 卸载模型加载,以节省更多内存,这是生成高分辨率视频所必需的
--save-path./results保存路径

-

sample_video.py

这个文件里定义了一个main函数,用于加载模型并生成视频样本,是一个包含整体流程的文件,这里就不细讲了。

def main():
    args = parse_args()     # 解析命令行参数
    print(args)
    models_root_path = Path(args.model_base)    # 检查模型路径
    if not models_root_path.exists():
        raise ValueError(f"`models_root` not exists: {models_root_path}")
    
    # Create save folder to save the samples    创建保存目录
    save_path = args.save_path if args.save_path_suffix=="" else f'{args.save_path}_{args.save_path_suffix}'
    if not os.path.exists(args.save_path):
        os.makedirs(save_path, exist_ok=True)

    # Load models   # 加载模型
    hunyuan_video_sampler = HunyuanVideoSampler.from_pretrained(models_root_path, args=args)
    
    # Get the updated args
    args = hunyuan_video_sampler.args

    # Start sampling
    # TODO: batch inference check
    outputs = hunyuan_video_sampler.predict(    # 生成视频样本
        prompt=args.prompt, 
        height=args.video_size[0],
        width=args.video_size[1],
        video_length=args.video_length,
        seed=args.seed,
        negative_prompt=args.neg_prompt,
        infer_steps=args.infer_steps,
        guidance_scale=args.cfg_scale,
        num_videos_per_prompt=args.num_videos,
        flow_shift=args.flow_shift,
        batch_size=args.batch_size,
        embedded_guidance_scale=args.embedded_cfg_scale
    )
    samples = outputs['samples']
    
    # Save samples
    for i, sample in enumerate(samples):
        sample = samples[i].unsqueeze(0)
        time_flag = datetime.fromtimestamp(time.time()).strftime("%Y-%m-%d-%H:%M:%S")
        save_path = f"{save_path}/{time_flag}_seed{outputs['seeds'][i]}_{outputs['prompts'][i][:100].replace('/','')}.mp4"
        save_videos_grid(sample, save_path, fps=24)     # 保存视频样本
        logger.info(f'Sample save to: {save_path}')

-

模型初始化

# Load models   # 加载模型
hunyuan_video_sampler = HunyuanVideoSampler.from_pretrained(models_root_path, args=args)

代码中加载模型这里,我们调用HunYuanVideoSampler的from_pretrained()初始化实例

实际上通过查看hyvideo/inference.py文件,我们可知,该方法实际上是其父类Inference实现的,功能包括初始化vae、text_encoder、扩散模型等核心部件,然后通过cls初始化并返回一个实例,而HunYuanVideoSampler类继承了父类的from_pretrained()方法,因此这里cls返回的是HunYuanVideoSampler的实例

为了方便查看,我删去了部分代码:

class Inference(object):
    ...
    @classmethod
    def from_pretrained(cls, pretrained_model_path, args, device=None, **kwargs):
        ...

        in_channels = args.latent_channels      # 16
        out_channels = args.latent_channels     # 16

        model = load_model(     # HYVideoDiffusionTransformer
            args,
            in_channels=in_channels,
            out_channels=out_channels,
            factor_kwargs=factor_kwargs,
        )

        ...

        # VAE
        vae, _, s_ratio, t_ratio = load_vae(     # AutoencoderKLCausal3D
            args.vae,
            args.vae_precision,
            logger=logger,
            device=device if not args.use_cpu_offload else "cpu",
        )

        # Text encoder
        if args.prompt_template_video is not None:      # 走这里
            crop_start = PROMPT_TEMPLATE[args.prompt_template_video].get(   # 95
                "crop_start", 0
            )
        ...
        max_length = args.text_len + crop_start     # 256+95

        # prompt_template
        prompt_template = (
            PROMPT_TEMPLATE[args.prompt_template]
            if args.prompt_template is not None
            else None
        )

        # prompt_template_video
        prompt_template_video = (
            PROMPT_TEMPLATE[args.prompt_template_video]
            if args.prompt_template_video is not None
            else None
        )

        text_encoder = TextEncoder(     # llm
            text_encoder_type=args.text_encoder,        # llm
            max_length=max_length,
            text_encoder_precision=args.text_encoder_precision,
            tokenizer_type=args.tokenizer,
            prompt_template=prompt_template,
            prompt_template_video=prompt_template_video,
            hidden_state_skip_layer=args.hidden_state_skip_layer,
            apply_final_norm=args.apply_final_norm,
            reproduce=args.reproduce,
            logger=logger,
            device=device if not args.use_cpu_offload else "cpu",
        )
        text_encoder_2 = None
        if args.text_encoder_2 is not None:
            text_encoder_2 = TextEncoder(
                text_encoder_type=args.text_encoder_2,      # clipL
                max_length=args.text_len_2,
                text_encoder_precision=args.text_encoder_precision_2,
                tokenizer_type=args.tokenizer_2,
                reproduce=args.reproduce,
                logger=logger,
                device=device if not args.use_cpu_offload else "cpu",
            )

        return cls(     # 初始化本类的一个实例
            args=args,
            vae=vae,        # AutoencoderKLCausal3D
            vae_kwargs=vae_kwargs,
            text_encoder=text_encoder,      # llm
            text_encoder_2=text_encoder_2,
            model=model,
            use_cpu_offload=args.use_cpu_offload,
            device=device,
            logger=logger,
        )

因为最后使用了cls(),也就会调用HunyuanVideoSampler的初始化方法__init__():具体来说,这里就是指定所有组件并将他们组合到一个pipeline,然后指定负面提示词。

class HunyuanVideoSampler(Inference):
    def __init__(...):
        super().__init__(...)

        self.pipeline = self.load_diffusion_pipeline(       # 组合所有原件
            args=args,
            vae=self.vae,
            text_encoder=self.text_encoder,
            text_encoder_2=self.text_encoder_2,
            model=self.model,
            device=self.device,
        )

        self.default_negative_prompt = NEGATIVE_PROMPT      # 负面提示词

其中,load_diffusion_pipeline()如下

def load_diffusion_pipeline(...):
    """Load the denoising scheduler for inference."""
    if scheduler is None:
        if args.denoise_type == "flow":
            scheduler = FlowMatchDiscreteScheduler(     # 创建 FlowMatchDiscreteScheduler
                shift=args.flow_shift,      # 默认7.0
                reverse=args.flow_reverse,  # True
                solver=args.flow_solver,    # 默认"euler"
            )
        else:
            raise ValueError(f"Invalid denoise type {args.denoise_type}")

    pipeline = HunyuanVideoPipeline(    # 创建管道,父类是DiffusionPipeline
        vae=vae,
        text_encoder=text_encoder,
        text_encoder_2=text_encoder_2,
        transformer=model,
        scheduler=scheduler,
        progress_bar_config=progress_bar_config,
        args=args,
    )
    if self.use_cpu_offload:
        pipeline.enable_sequential_cpu_offload()
    else:
        pipeline = pipeline.to(device)

    return pipeline

-

混元提供的提示词如下:

PROMPT_TEMPLATE_ENCODE = (
    "<|start_header_id|>system<|end_header_id|>\n\nDescribe the image by detailing the color, shape, size, texture, "
    "quantity, text, spatial relationships of the objects and background:<|eot_id|>"
    "<|start_header_id|>user<|end_header_id|>\n\n{}<|eot_id|>"
) 
PROMPT_TEMPLATE_ENCODE_VIDEO = (
    "<|start_header_id|>system<|end_header_id|>\n\nDescribe the video by detailing the following aspects: "
    "1. The main content and theme of the video."
    "2. The color, shape, size, texture, quantity, text, and spatial relationships of the objects."
    "3. Actions, events, behaviors temporal relationships, physical movement changes of the objects."
    "4. background environment, light, style and atmosphere."
    "5. camera angles, movements, and transitions used in the video:<|eot_id|>"
    "<|start_header_id|>user<|end_header_id|>\n\n{}<|eot_id|>"
)  

NEGATIVE_PROMPT = "Aerial view, aerial view, overexposed, low quality, deformation, a poor composition, bad hands, bad teeth, bad eyes, bad limbs, distortion"

PROMPT_TEMPLATE = {
    "dit-llm-encode": {
        "template": PROMPT_TEMPLATE_ENCODE,
        "crop_start": 36,
    },
    "dit-llm-encode-video": {
        "template": PROMPT_TEMPLATE_ENCODE_VIDEO,
        "crop_start": 95,
    },
}

模型生成

主文件中:

outputs = hunyuan_video_sampler.predict()

这里的主要内容就是处理prompt、视频帧数、宽和高等信息,然后调用self.pipeline()进行处理。

@torch.no_grad()
def predict(...):
    ...
    
    generator = [torch.Generator(self.device).manual_seed(seed) for seed in seeds]
    out_dict["seeds"] = seeds

    ...

    target_height = align_to(height, 16)
    target_width = align_to(width, 16)
    target_video_length = video_length

    out_dict["size"] = (target_height, target_width, target_video_length)

    ...

    # negative prompt
    if negative_prompt is None or negative_prompt == "":
        negative_prompt = self.default_negative_prompt
    if not isinstance(negative_prompt, str):
        raise TypeError(
            f"`negative_prompt` must be a string, but got {type(negative_prompt)}"
        )
    negative_prompt = [negative_prompt.strip()]

    scheduler = FlowMatchDiscreteScheduler(     # 调度器配置
        shift=flow_shift,
        reverse=self.args.flow_reverse,
        solver=self.args.flow_solver
    )
    self.pipeline.scheduler = scheduler

    freqs_cos, freqs_sin = self.get_rotary_pos_embed(       # 位置编码生成
        target_video_length, target_height, target_width
    )
    n_tokens = freqs_cos.shape[0]

    ...

    samples = self.pipeline(
        prompt=prompt,
        height=target_height,
        width=target_width,
        video_length=target_video_length,
        num_inference_steps=infer_steps,
        guidance_scale=guidance_scale,
        negative_prompt=negative_prompt,
        num_videos_per_prompt=num_videos_per_prompt,
        generator=generator,
        output_type="pil",
        freqs_cis=(freqs_cos, freqs_sin),
        n_tokens=n_tokens,
        embedded_guidance_scale=embedded_guidance_scale,
        data_type="video" if target_video_length > 1 else "image",
        is_progress_bar=True,
        vae_ver=self.args.vae,
        enable_tiling=self.args.vae_tiling,
    )[0]
    out_dict["samples"] = samples
    out_dict["prompts"] = prompt

    return out_dict

-

HunyuanVideoPipeline.__call__()

samples = self.pipeline

上述代码实际上调用的是HunyuanVideoPipeline的__call__()方法,位置在diffusers库下的pipeline_hunyuan_video.py文件

其主要过程如下:

  1. 参数检查(对应官方注释0-2):检查输入参数的有效性。没什么可说的,就不细讲了。
  2. 编码提示词(对应官方注释3):分别用llm和clipL(如果有)将正负文本提示转换为嵌入向量。
  3. 准备时间步(对应官方注释4):设置时间步长用于去噪过程。
  4. 准备潜在变量(对应官方注释5):初始化潜在变量。
  5. 去噪循环(对应官方注释7):通过多次迭代逐步减少噪声,生成最终的图像或视频帧。
  6. 解码潜在变量(对应官方注释7):将潜在变量解码为最终的图像或视频。
  7. 返回结果:根据 output_type 返回生成的图像或视频。

-

编码提示词

这里就是使用大语言模型和clip对正负提示词进行编码

# 3. Encode input prompt  分别用llm和clipL对文本进行编码
lora_scale = (
    self.cross_attention_kwargs.get("scale", None)
    if self.cross_attention_kwargs is not None
    else None
)

(
    prompt_embeds,
    negative_prompt_embeds,
    prompt_mask,
    negative_prompt_mask,
) = self.encode_prompt(     # 将输入的文本提示(prompt)编码为文本编码器(llm)的隐藏状态。
    prompt,
    device,
    num_videos_per_prompt,
    self.do_classifier_free_guidance,
    negative_prompt,
    prompt_embeds=prompt_embeds,
    attention_mask=attention_mask,
    negative_prompt_embeds=negative_prompt_embeds,
    negative_attention_mask=negative_attention_mask,
    lora_scale=lora_scale,
    clip_skip=self.clip_skip,       # 没写text_encoder就是默认llm
    data_type=data_type,
)
if self.text_encoder_2 is not None:
    (
        prompt_embeds_2,
        negative_prompt_embeds_2,
        prompt_mask_2,
        negative_prompt_mask_2,
    ) = self.encode_prompt(
        prompt,
        device,
        num_videos_per_prompt,
        self.do_classifier_free_guidance,
        negative_prompt,
        prompt_embeds=None,
        attention_mask=None,
        negative_prompt_embeds=None,
        negative_attention_mask=None,
        lora_scale=lora_scale,
        clip_skip=self.clip_skip,
        text_encoder=self.text_encoder_2,   # clipL
        data_type=data_type,
    )
else:
    prompt_embeds_2 = None
    negative_prompt_embeds_2 = None
    prompt_mask_2 = None
    negative_prompt_mask_2 = None

除此之外,还会把编码和掩码信息concat到一起

if self.do_classifier_free_guidance:     # 7.5>1,默认会执行,将正负提示和掩码进行拼接
    prompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds])
    if prompt_mask is not None:
        prompt_mask = torch.cat([negative_prompt_mask, prompt_mask])
    if prompt_embeds_2 is not None:
        prompt_embeds_2 = torch.cat([negative_prompt_embeds_2, prompt_embeds_2])
    if prompt_mask_2 is not None:
        prompt_mask_2 = torch.cat([negative_prompt_mask_2, prompt_mask_2])

获取时间步长

这里的目的是从调度器(scheduler)中获取时间步长

# 4. Prepare timesteps
extra_set_timesteps_kwargs = self.prepare_extra_func_kwargs(
    self.scheduler.set_timesteps, {"n_tokens": n_tokens}
)
timesteps, num_inference_steps = retrieve_timesteps(    # 从调度器(scheduler)中获取时间步长
    self.scheduler,
    num_inference_steps,
    device,
    timesteps,
    sigmas,
    **extra_set_timesteps_kwargs,
)

准备潜在变量

# 5. Prepare latent variables
num_channels_latents = self.transformer.config.in_channels
latents = self.prepare_latents(
    batch_size * num_videos_per_prompt,
    num_channels_latents,
    height,
    width,
    video_length,
    prompt_embeds.dtype,
    device,
    generator,
    latents,
)

其中的关键步骤:

if latents is None:
    latents = randn_tensor(
        shape, generator=generator, device=device, dtype=dtype      # 调用 randn_tensor 生成随机噪声
    )
else:
    latents = latents.to(device)

randn_tensor()中具体生成步骤:使用torch.randn()生成一个随机张量 latents,其形状由 shape 参数指定。生成的随机数基于标准正态分布,并且可以通过 generator 参数指定随机数生成器。

else:
    latents = torch.randn(shape, generator=generator, device=rand_device, dtype=dtype, layout=layout).to(device)    # 这些数对于每次使用相同的种子值时都是相同的。

去噪

下面是去噪过程的完整代码,具体功能如下:

  1. 初始化进度条:使用 self.progress_bar 初始化一个进度条,总步数为 num_inference_steps。
  2. 扩展潜在变量:
    1. 扩展潜在变量:根据 do_classifier_free_guidance 参数,决定是否对潜在变量 latents 进行扩展。如果启用,则将 latents 复制一份并拼接;否则保持不变。
    2. 添加时间步长:调用 scheduler.scale_model_input 方法,将时间步长 t 添加到扩展后的潜在变量中。
    3. 扩展时间步长和指导权重:将时间步长 t 扩展为与潜在变量形状相同的张量,并根据 embedded_guidance_scale 计算指导权重。
  3. 预测噪声残差:使用 self.transformer 预测噪声残差 noise_pred。
  4. 执行引导:如果启用了分类器自由引导,对噪声残差进行引导处理。
    1. 如果启用了分类器自由引导(self.do_classifier_free_guidance为True),则将噪声预测结果noise_pred拆分为无条件部分noise_pred_uncond和有条件部分noise_pred_text。
    2. 然后,根据引导比例self.guidance_scale调整噪声预测结果,使其偏向有条件部分。
  5. 调整噪声:如果启用了引导重缩放 (guidance_rescale),进一步调整噪声。
    1. 如果同时启用了分类器自由引导和噪声预测的重新缩放(self.guidance_rescale大于0.0),则调用rescale_noise_cfg函数对噪声预测结果进行重新缩放。
  6. 计算前一个噪声样本:使用 self.scheduler.step 计算前一个噪声样本 latents。
  7. 回调函数:在特定步骤调用回调函数 callback_on_step_end 和 callback,更新相关变量。
  8. 更新进度条:在特定步骤更新进度条。
# if is_progress_bar:
with self.progress_bar(total=num_inference_steps) as progress_bar:
    for i, t in enumerate(timesteps):
        if self.interrupt:
            continue

        # expand the latents if we are doing classifier free guidance
        latent_model_input = (      # 扩展潜在变量:根据是否启用分类器自由引导 (do_classifier_free_guidance),扩展潜在变量 latents。
            torch.cat([latents] * 2)
            if self.do_classifier_free_guidance
            else latents
        )
        latent_model_input = self.scheduler.scale_model_input(  # 添加时间步长 t
            latent_model_input, t
        )

        t_expand = t.repeat(latent_model_input.shape[0])
        guidance_expand = (
            torch.tensor(
                [embedded_guidance_scale] * latent_model_input.shape[0],
                dtype=torch.float32,
                device=device,
            ).to(target_dtype)
            * 1000.0
            if embedded_guidance_scale is not None
            else None
        )

        # predict the noise residual    # 预测噪声残差
        with torch.autocast(
            device_type="cuda", dtype=target_dtype, enabled=autocast_enabled
        ):
            noise_pred = self.transformer(  # For an input image (129, 192, 336) (1, 256, 256)
                latent_model_input,  # [2, 16, 33, 24, 42]
                t_expand,  # [2]
                text_states=prompt_embeds,  # [2, 256, 4096]
                text_mask=prompt_mask,  # [2, 256]
                text_states_2=prompt_embeds_2,  # [2, 768]
                freqs_cos=freqs_cis[0],  # [seqlen, head_dim]
                freqs_sin=freqs_cis[1],  # [seqlen, head_dim]
                guidance=guidance_expand,
                return_dict=True,
            )[
                "x"
            ]

        # perform guidance
        if self.do_classifier_free_guidance:        # 如果启用了分类器自由引导,对噪声残差进行引导处理。
            noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
            noise_pred = noise_pred_uncond + self.guidance_scale * (
                noise_pred_text - noise_pred_uncond
            )

        if self.do_classifier_free_guidance and self.guidance_rescale > 0.0:    # 据条件进行噪声预测的重新缩放
            # Based on 3.4. in https://arxiv.org/pdf/2305.08891.pdf
            noise_pred = rescale_noise_cfg(
                noise_pred,
                noise_pred_text,
                guidance_rescale=self.guidance_rescale,
            )

        # compute the previous noisy sample x_t -> x_t-1    计算前一个噪声样本
        latents = self.scheduler.step(
            noise_pred, t, latents, **extra_step_kwargs, return_dict=False
        )[0]

        if callback_on_step_end is not None:
            callback_kwargs = {}
            for k in callback_on_step_end_tensor_inputs:
                callback_kwargs[k] = locals()[k]
            callback_outputs = callback_on_step_end(self, i, t, callback_kwargs)

            latents = callback_outputs.pop("latents", latents)      # 回调函数的输出会被用于更新 latents、prompt_embeds 和 negative_prompt_embeds。
            prompt_embeds = callback_outputs.pop("prompt_embeds", prompt_embeds)
            negative_prompt_embeds = callback_outputs.pop(
                "negative_prompt_embeds", negative_prompt_embeds
            )

        # call the callback, if provided        进度条更新和通用回调
        if i == len(timesteps) - 1 or (
            (i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0
        ):
            if progress_bar is not None:
                progress_bar.update()
            if callback is not None and i % callback_steps == 0:
                step_idx = i // getattr(self.scheduler, "order", 1)
                callback(step_idx, t, latents)

-

DiT的transformer架构

transformer的forward()函数的整体流程如下:

class HYVideoDiffusionTransformer(ModelMixin, ConfigMixin):
    def forward(...) -> Union[torch.Tensor, Dict[str, torch.Tensor]]:
        out = {}
        img = x
        txt = text_states
        _, _, ot, oh, ow = x.shape      # b,c,f,h,w
        tt, th, tw = (
            ot // self.patch_size[0],
            oh // self.patch_size[1],
            ow // self.patch_size[2],
        )

        # Prepare modulation vectors.
        vec = self.time_in(t)       # 时间嵌入:将时间步 t 转换为时间嵌入向量 vec。

        # text modulation
        vec = vec + self.vector_in(text_states_2)   # 时间步+clipl的向量

        # guidance modulation
        if self.guidance_embed:     # 如果启用了指导嵌入,将指导向量 guidance 加入到 vec 中。
            if guidance is None:
                raise ValueError(
                    "Didn't get guidance strength for guidance distilled model."
                )

            # our timestep_embedding is merged into guidance_in(TimestepEmbedder)
            vec = vec + self.guidance_in(guidance)

        # Embed image and text.
        img = self.img_in(img)      # Patchify
        if self.text_projection == "linear":
            txt = self.txt_in(txt)      # TextProjection
        elif self.text_projection == "single_refiner":
            txt = self.txt_in(txt, t, text_mask if self.use_attention_mask else None)   # SingleTokenRefiner
        else:
            raise NotImplementedError(
                f"Unsupported text_projection: {self.text_projection}"
            )

        txt_seq_len = txt.shape[1]
        img_seq_len = img.shape[1]

        # Compute cu_squlens and max_seqlen for flash attention  计算序列长度:计算图像和文本的序列长度,用于后续的注意力机制。
        cu_seqlens_q = get_cu_seqlens(text_mask, img_seq_len)
        cu_seqlens_kv = cu_seqlens_q
        max_seqlen_q = img_seq_len + txt_seq_len
        max_seqlen_kv = max_seqlen_q

        freqs_cis = (freqs_cos, freqs_sin) if freqs_cos is not None else None
        # --------------------- Pass through DiT blocks ------------------------
        for _, block in enumerate(self.double_blocks):      # 通过双流块
            double_block_args = [
                img,
                txt,
                vec,
                cu_seqlens_q,
                cu_seqlens_kv,
                max_seqlen_q,
                max_seqlen_kv,
                freqs_cis,
            ]

            img, txt = block(*double_block_args)

        # Merge txt and img to pass through single stream blocks.
        x = torch.cat((img, txt), 1)    # 合并图像和文本
        if len(self.single_blocks) > 0:
            for _, block in enumerate(self.single_blocks):      # 通过单流块
                single_block_args = [
                    x,
                    vec,
                    txt_seq_len,
                    cu_seqlens_q,
                    cu_seqlens_kv,
                    max_seqlen_q,
                    max_seqlen_kv,
                    (freqs_cos, freqs_sin),
                ]

                x = block(*single_block_args)

        img = x[:, :img_seq_len, ...]       # 获取图片

        # ---------------------------- Final layer ------------------------------
        img = self.final_layer(img, vec)  # (N, T, patch_size ** 2 * out_channels) 最终层处理

        img = self.unpatchify(img, tt, th, tw)      # 将图片重新恢复为原尺寸
        if return_dict:
            out["x"] = img
            return out
        return img

-

time_in 时间嵌入

其功能是处理输入的时间步 t 并生成时间步的嵌入向量。具体步骤如下:

  1. 使用 timestep_embedding 函数生成时间步 t 的正弦嵌入向量 t_freq。
  2. 将生成的嵌入向量 t_freq 转换为与 self.mlp 层权重相同的数据类型。
  3. 使用 self.mlp 层对转换后的嵌入向量进行处理,生成最终的嵌入向量 t_emb。
  4. 返回生成的嵌入向量 t_emb。
class TimestepEmbedder(nn.Module):
    def __init__(...):
        
        self.mlp = nn.Sequential(
            nn.Linear(
                frequency_embedding_size, hidden_size, bias=True, **factory_kwargs
            ),
            act_layer(),
            nn.Linear(hidden_size, out_size, bias=True, **factory_kwargs),
        )
        nn.init.normal_(self.mlp[0].weight, std=0.02)
        nn.init.normal_(self.mlp[2].weight, std=0.02)

    def forward(self, t):
        t_freq = timestep_embedding(        # 用于生成时间步的正弦嵌入向量
            t, self.frequency_embedding_size, self.max_period
        ).type(self.mlp[0].weight.dtype)
        t_emb = self.mlp(t_freq)
        return t_emb

-

text_modulation clip处理的向量和时间步向量融合

在输入transformer之前,我们已经用clip将文本处理好了,现在,我们需要使用MLP将clip的文本向量转换为和时间步向量尺寸相同的向量

# text modulation
self.vector_in = MLPEmbedder(
    self.text_states_dim_2, self.hidden_size, **factory_kwargs
)

具体结构

class MLPEmbedder(nn.Module):
    def __init__(self, in_dim: int, hidden_dim: int, device=None, dtype=None):
        ...
        self.in_layer = nn.Linear(in_dim, hidden_dim, bias=True, **factory_kwargs)
        self.silu = nn.SiLU()
        self.out_layer = nn.Linear(hidden_dim, hidden_dim, bias=True, **factory_kwargs)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.out_layer(self.silu(self.in_layer(x)))

在transformer中,融合的代码也很简单:

其中vec是时间步的向量,可见融合就是简单的加上clipl的向量

# text modulation
vec = vec + self.vector_in(text_states_2)   # 时间步+clipl的向量

图像 Patchify

在HYVideoDiffusionTransformer的__init__()函数中定义了img_in,也就是用于对图像切片的类PatchEmbed

self.img_in = PatchEmbed(
    self.patch_size, self.in_channels, self.hidden_size, **factory_kwargs
)

PatchEmbed使用3D卷积和线性层将视频帧切片并转换为具体步骤如下:

  1. 投影变换:通过 self.proj(x) 将输入张量 x 的形状从 [b, 3, t, h, w] 变换为 \left[ b, embeddim, \left\lfloor \frac{t}{patch size} \right\rfloor, \left\lfloor \frac{h}{patch size} \right\rfloor, \left\lfloor \frac{w}{patch size} \right\rfloor \right]
  2. 展平操作:如果 self.flatten 为 True,则将张量展平并转置,使其形状变为 \left[ b, embeddim, \left\lfloor \frac{t}{patch size} \right\rfloor*\left\lfloor \frac{h}{patch size} \right\rfloor*\left\lfloor \frac{w}{patch size} \right\rfloor \right]
  3. 归一化:通过 self.norm(x) 对张量进行归一化处理。
  4. 返回结果:最终返回处理后的张量 x。
class PatchEmbed(nn.Module):
    def __init__(...):
        ...

        self.proj = nn.Conv3d(
            in_chans,
            embed_dim,
            kernel_size=patch_size,
            stride=patch_size,
            bias=bias,
            **factory_kwargs
        )
        nn.init.xavier_uniform_(self.proj.weight.view(self.proj.weight.size(0), -1))
        if bias:
            nn.init.zeros_(self.proj.bias)

        self.norm = norm_layer(embed_dim) if norm_layer else nn.Identity()

    def forward(self, x):
        x = self.proj(x)        # [b,3,t,h,w]->[b,768,t1,h1,w1] 
        if self.flatten:
            x = x.flatten(2).transpose(1, 2)  # [b,768,t1,h1,w1]->[b,768,t1*h1*w1]
        x = self.norm(x)
        return x

-

MLLM文本 Refiner

使用带门控的自注意力,用于精细化处理单个令牌(Token)

elif self.text_projection == "single_refiner":
    txt = self.txt_in(txt, t, text_mask if self.use_attention_mask else None)   # SingleTokenRefiner

具体如下:

class SingleTokenRefiner(nn.Module):
    def __init__(...):
        
        self.input_embedder = nn.Linear(
            in_channels, hidden_size, bias=True, **factory_kwargs
        )

        act_layer = get_activation_layer(act_type)
        # Build timestep embedding layer
        self.t_embedder = TimestepEmbedder(hidden_size, act_layer, **factory_kwargs)
        # Build context embedding layer
        self.c_embedder = TextProjection(
            in_channels, hidden_size, act_layer, **factory_kwargs
        )

        self.individual_token_refiner = IndividualTokenRefiner(
            hidden_size=hidden_size,
            heads_num=heads_num,
            depth=depth,
            mlp_width_ratio=mlp_width_ratio,
            mlp_drop_rate=mlp_drop_rate,
            act_type=act_type,
            qk_norm=qk_norm,
            qk_norm_type=qk_norm_type,
            qkv_bias=qkv_bias,
            **factory_kwargs,
        )

    def forward(
        self,
        x: torch.Tensor,
        t: torch.LongTensor,
        mask: Optional[torch.LongTensor] = None,
    ):
        timestep_aware_representations = self.t_embedder(t)     # 获取时间步长嵌入

        if mask is None:
            context_aware_representations = x.mean(dim=1)   # 计算x的均值
        else:
            mask_float = mask.float().unsqueeze(-1)  # [b, s1, 1]
            context_aware_representations = (x * mask_float).sum(   # 使用掩码计算加权均值
                dim=1
            ) / mask_float.sum(dim=1)
        context_aware_representations = self.c_embedder(context_aware_representations)  # 上下文嵌入
        c = timestep_aware_representations + context_aware_representations

        x = self.input_embedder(x)

        x = self.individual_token_refiner(x, c, mask)       # 一个带门控机制的多头自注意力

        return x

-

双流DiT

在transformer中的初始化方法

# double blocks
self.double_blocks = nn.ModuleList(
    [
        MMDoubleStreamBlock(
            self.hidden_size,
            self.heads_num,
            mlp_width_ratio=mlp_width_ratio,
            mlp_act_type=mlp_act_type,
            qk_norm=qk_norm,
            qk_norm_type=qk_norm_type,
            qkv_bias=qkv_bias,
            **factory_kwargs,
        )
        for _ in range(mm_double_blocks_depth)
    ]
)

我们主要来看MMDoubleStreamBlock

class MMDoubleStreamBlock(nn.Module):
    def forward() -> Tuple[torch.Tensor, torch.Tensor]:
        ## 1.获取门控的参数
        (
            img_mod1_shift,
            img_mod1_scale,
            img_mod1_gate,
            img_mod2_shift,
            img_mod2_scale,
            img_mod2_gate,
        ) = self.img_mod(vec).chunk(6, dim=-1)
        (
            txt_mod1_shift,
            txt_mod1_scale,
            txt_mod1_gate,
            txt_mod2_shift,
            txt_mod2_scale,
            txt_mod2_gate,
        ) = self.txt_mod(vec).chunk(6, dim=-1)


        # 2.Prepare image for attention.
        img_modulated = self.img_norm1(img)
        img_modulated = modulate(       # shift and scale
            img_modulated, shift=img_mod1_shift, scale=img_mod1_scale
        )
        img_qkv = self.img_attn_qkv(img_modulated)      # 计算QKV
        img_q, img_k, img_v = rearrange(
            img_qkv, "B L (K H D) -> K B L H D", K=3, H=self.heads_num
        )
        # Apply QK-Norm if needed       QK-Norm
        img_q = self.img_attn_q_norm(img_q).to(img_v)
        img_k = self.img_attn_k_norm(img_k).to(img_v)

        # Apply RoPE if needed.     3D-RoPE
        if freqs_cis is not None:
            img_qq, img_kk = apply_rotary_emb(img_q, img_k, freqs_cis, head_first=False)
            assert (
                img_qq.shape == img_q.shape and img_kk.shape == img_k.shape
            ), f"img_kk: {img_qq.shape}, img_q: {img_q.shape}, img_kk: {img_kk.shape}, img_k: {img_k.shape}"
            img_q, img_k = img_qq, img_kk

        # Prepare txt for attention.
        txt_modulated = self.txt_norm1(txt)
        txt_modulated = modulate(       # shift and scale
            txt_modulated, shift=txt_mod1_shift, scale=txt_mod1_scale
        )
        txt_qkv = self.txt_attn_qkv(txt_modulated)      # 计算QKV
        txt_q, txt_k, txt_v = rearrange(
            txt_qkv, "B L (K H D) -> K B L H D", K=3, H=self.heads_num
        )
        # Apply QK-Norm if needed.      QK-Norm
        txt_q = self.txt_attn_q_norm(txt_q).to(txt_v)
        txt_k = self.txt_attn_k_norm(txt_k).to(txt_v)

        # Run actual attention.     # 将文本和图像的QKV拼接起来
        q = torch.cat((img_q, txt_q), dim=1)
        k = torch.cat((img_k, txt_k), dim=1)
        v = torch.cat((img_v, txt_v), dim=1)
        assert (
            cu_seqlens_q.shape[0] == 2 * img.shape[0] + 1
        ), f"cu_seqlens_q.shape:{cu_seqlens_q.shape}, img.shape[0]:{img.shape[0]}"


        # 3.计算 attention
        attn = attention(
            q,
            k,
            v,
            cu_seqlens_q=cu_seqlens_q,
            cu_seqlens_kv=cu_seqlens_kv,
            max_seqlen_q=max_seqlen_q,
            max_seqlen_kv=max_seqlen_kv,
            batch_size=img_k.shape[0],
        )

        img_attn, txt_attn = attn[:, : img.shape[1]], attn[:, img.shape[1] :]


        # 4.MLP+门控
        # Calculate the img bloks.
        img = img + apply_gate(self.img_attn_proj(img_attn), gate=img_mod1_gate)
        img = img + apply_gate(
            self.img_mlp(
                modulate(       # 将注意力结果与原始图像和文本数据进行融合。
                    self.img_norm2(img), shift=img_mod2_shift, scale=img_mod2_scale
                )
            ),
            gate=img_mod2_gate,
        )

        # Calculate the txt bloks.
        txt = txt + apply_gate(self.txt_attn_proj(txt_attn), gate=txt_mod1_gate)
        txt = txt + apply_gate(
            self.txt_mlp(
                modulate(
                    self.txt_norm2(txt), shift=txt_mod2_shift, scale=txt_mod2_scale
                )
            ),
            gate=txt_mod2_gate,
        )

        return img, txt

-

可见所谓的双流DiT实际上就是带了门控机制的Transformer,这里就不多介绍了,我们以图像的为例主要看一下img_mod

self.img_mod = ModulateDiT(
    hidden_size,
    factor=6,
    act_layer=get_activation_layer("silu"),
    **factory_kwargs,
)
self.img_norm1 = nn.LayerNorm(
    hidden_size, elementwise_affine=False, eps=1e-6, **factory_kwargs
)

所谓的ModulateDiT,其实就是线性层:

class ModulateDiT(nn.Module):
    def __init__(...):
        
        self.act = act_layer()
        self.linear = nn.Linear(
            hidden_size, factor * hidden_size, bias=True, **factory_kwargs
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.linear(self.act(x))

-

单流DiT
# single blocks
self.single_blocks = nn.ModuleList(
    [
        MMSingleStreamBlock(
            self.hidden_size,
            self.heads_num,
            mlp_width_ratio=mlp_width_ratio,
            mlp_act_type=mlp_act_type,
            qk_norm=qk_norm,
            qk_norm_type=qk_norm_type,
            **factory_kwargs,
        )
        for _ in range(mm_single_blocks_depth)
    ]
)

代码如下:

class MMSingleStreamBlock(nn.Module):
   
    def forward() -> torch.Tensor:
        # 1.Norm
        mod_shift, mod_scale, mod_gate = self.modulation(vec).chunk(3, dim=-1)
        
        # 2.Scale and Shift
        x_mod = modulate(self.pre_norm(x), shift=mod_shift, scale=mod_scale)
        qkv, mlp = torch.split(
            self.linear1(x_mod), [3 * self.hidden_size, self.mlp_hidden_dim], dim=-1
        )
        
        # 3.Attention
        q, k, v = rearrange(qkv, "B L (K H D) -> K B L H D", K=3, H=self.heads_num)

        # Apply QK-Norm if needed.      QK-Norm
        q = self.q_norm(q).to(v)
        k = self.k_norm(k).to(v)

        # Apply RoPE if needed.
        if freqs_cis is not None:
            img_q, txt_q = q[:, :-txt_len, :, :], q[:, -txt_len:, :, :]
            img_k, txt_k = k[:, :-txt_len, :, :], k[:, -txt_len:, :, :]
            img_qq, img_kk = apply_rotary_emb(img_q, img_k, freqs_cis, head_first=False)
            assert (
                img_qq.shape == img_q.shape and img_kk.shape == img_k.shape
            ), f"img_kk: {img_qq.shape}, img_q: {img_q.shape}, img_kk: {img_kk.shape}, img_k: {img_k.shape}"
            img_q, img_k = img_qq, img_kk
            q = torch.cat((img_q, txt_q), dim=1)
            k = torch.cat((img_k, txt_k), dim=1)

        # Compute attention.
        assert (
            cu_seqlens_q.shape[0] == 2 * x.shape[0] + 1
        ), f"cu_seqlens_q.shape:{cu_seqlens_q.shape}, x.shape[0]:{x.shape[0]}"
        attn = attention(
            q,
            k,
            v,
            cu_seqlens_q=cu_seqlens_q,
            cu_seqlens_kv=cu_seqlens_kv,
            max_seqlen_q=max_seqlen_q,
            max_seqlen_kv=max_seqlen_kv,
            batch_size=x.shape[0],
        )

        # 4.计算mlp,并和attn结果合并
        # Compute activation in mlp stream, cat again and run second linear layer.
        output = self.linear2(torch.cat((attn, self.mlp_act(mlp)), 2))
        
        # 5.应用gate
        return x + apply_gate(output, gate=mod_gate)

-

FinalLayer
self.final_layer = FinalLayer(
    self.hidden_size,
    self.patch_size,
    self.out_channels,
    get_activation_layer("silu"),
    **factory_kwargs,
)

具体结构如下:

class FinalLayer(nn.Module):

    def __init__(
        
        self.norm_final = nn.LayerNorm(
            hidden_size, elementwise_affine=False, eps=1e-6, **factory_kwargs
        )
        
        self.adaLN_modulation = nn.Sequential(
            act_layer(),
            nn.Linear(hidden_size, 2 * hidden_size, bias=True, **factory_kwargs),
        )

    def forward(self, x, c):
        shift, scale = self.adaLN_modulation(c).chunk(2, dim=1)     
        x = modulate(self.norm_final(x), shift=shift, scale=scale)      # Modulation
        x = self.linear(x)  
        return x

图像恢复:

def unpatchify(self, x, t, h, w):
    x = x.reshape(shape=(x.shape[0], t, h, w, c, pt, ph, pw))       # 将输入张量 x 重塑为 (N, T, H, W, C, pt, ph, pw)。
    x = torch.einsum("nthwcopq->nctohpwq", x)       # 使用 torch.einsum 函数重新排列张量的维度。
    imgs = x.reshape(shape=(x.shape[0], c, t * pt, h * ph, w * pw))     # 将张量重塑为 (N, C, T * pt, H * ph, W * pw) 并返回。

-

解码

这段代码的主要功能是根据输入的 latents 数据和配置参数,进行解码操作生成图像 image。具体步骤如下:

  1. 检查输出类型:如果 output_type 不是 "latent",则继续执行后续步骤;否则,直接将 latents 赋值给 image。
  2. 调整 latents 的维度:
    1. 如果 latents 的形状为 (b, c, h, w),也就是图片,并且 vae 是 AutoencoderKLCausal3D 类型,则在第2维扩展 latents,形状变为形状为 (b, c, 1, w),并设置 expand_temporal_dim 为 True。
    2. 如果 latents 的形状为 (b, c, f, h, w),则不进行任何操作。
    3. 其他形状的 latents 将引发 ValueError。
  3. 调整 latents 的值:
    1. 如果 vae 配置中有 shift_factor,则对 latents 进行缩放和偏移。
    2. 否则,仅进行缩放。
  4. 解码操作:
    1. 使用 torch.autocast 进行自动混合精度计算。
    2. 如果启用了 enable_tiling,则启用平铺解码。
    3. 调用 self.vae.decode 方法解码 latents,生成 image。
  5. 调整 image 的维度:
    1. 如果 expand_temporal_dim 为 True 或 image 的第2维为1,则压缩 image 的第2维。
if not output_type == "latent":
    expand_temporal_dim = False
    # 调整 latents 的维度
    if len(latents.shape) == 4:     # bchw->bcfhw,其中f=1,(如果是图片,增加f维度)
        if isinstance(self.vae, AutoencoderKLCausal3D):
            latents = latents.unsqueeze(2)
            expand_temporal_dim = True
    elif len(latents.shape) == 5:   # bcfhw,f是帧数
        pass
    else:
        raise ValueError(
            f"Only support latents with shape (b, c, h, w) or (b, c, f, h, w), but got {latents.shape}."
        )

    if (    # 调整 latents 的值
        hasattr(self.vae.config, "shift_factor")
        and self.vae.config.shift_factor
    ):
        latents = (     # 调整潜在空间的分布
            latents / self.vae.config.scaling_factor
            + self.vae.config.shift_factor
        )
    else:
        latents = latents / self.vae.config.scaling_factor

    with torch.autocast(        # 解码操作
        device_type="cuda", dtype=vae_dtype, enabled=vae_autocast_enabled
    ):
        if enable_tiling:
            self.vae.enable_tiling()
            image = self.vae.decode(
                latents, return_dict=False, generator=generator
            )[0]
        else:
            image = self.vae.decode(
                latents, return_dict=False, generator=generator
            )[0]

    if expand_temporal_dim or image.shape[2] == 1:      # 调整 image 的维度,如果是图片则去掉f
        image = image.squeeze(2)

else:
    image = latents

-

保存视频

# Save samples
for i, sample in enumerate(samples):
    sample = samples[i].unsqueeze(0)
    time_flag = datetime.fromtimestamp(time.time()).strftime("%Y-%m-%d-%H:%M:%S")       # 时间
    save_path = f"{save_path}/{time_flag}_seed{outputs['seeds'][i]}_{outputs['prompts'][i][:100].replace('/','')}.mp4"  # 文件名
    save_videos_grid(sample, save_path, fps=24)     # 保存视频样本
    logger.info(f'Sample save to: {save_path}')     # 打印保存路径

保存视频的函数

def save_videos_grid(videos: torch.Tensor, path: str, rescale=False, n_rows=1, fps=24):
    
    videos = rearrange(videos, "b c t h w -> t b c h w")
    outputs = []
    for x in videos:        # x:[b,c,h,w]
        x = torchvision.utils.make_grid(x, nrow=n_rows)     # [c,h,w]   # 对每个时间帧 x,使用 torchvision.utils.make_grid 将其转换为网格图像
        x = x.transpose(0, 1).transpose(1, 2).squeeze(-1)   # [h,w,c]
        if rescale:
            x = (x + 1.0) / 2.0  # -1,1 -> 0,1      将图像数据裁剪到 [0, 1] 范围
        x = torch.clamp(x, 0, 1)    # 将图像数据裁剪到 [0, 1] 范围
        x = (x * 255).numpy().astype(np.uint8)  # 将图像数据裁剪到 [0, 255] 范围
        outputs.append(x)

    os.makedirs(os.path.dirname(path), exist_ok=True)
    imageio.mimsave(path, outputs, fps=fps)     # 将所有图像帧保存为视频文件

-

-

5.总结

Hunyuan-Video是腾讯推出的一款创新性开源视频生成模型,它在AI视频生成领域树立了新的标杆。该模型以其卓越的性能和对中文输入Prompt的支持,被认为是目前最好的开源视频生成基座模型之一。Hunyuan-Video的核心优势在于其能够缩小闭源和开源视频基础模型之间的差距,从而加速社区的探索和创新。

Hunyuan-Video的成功开源,不仅为视频内容创作者、研究人员和开发者提供了一个强大的工具,而且推动了AI视频技术的发展,为未来的视频生成应用开辟了新的可能性。

-

🌟 如果您喜欢这篇关于Hunyuan-Video的深入介绍,并觉得它对您有所启发,请点击文章底部的“赞”按钮,给我们一点鼓励吧!您的每一个赞都是我们继续创作优质内容的动力。

🔄 同时,别忘了点击“关注”,这样您就可以第一时间获取我们最新的AI技术动态、深度解析和行业趋势。我们承诺,只分享最有价值的信息,让您的每一次点击都充满价值。

💖 收藏这篇文章,下次您需要回顾Hunyuan-Video的详细信息时,就能快速找到它。您的收藏不仅是对我们工作的认可,也是您个人知识库的宝贵财富。

感谢您的支持,让我们携手探索AI的无限可能!

Logo

为开发者提供按需使用的算力基础设施。

更多推荐