上篇 | 使用 🤗 Transformers 进行概率时间序列预测

介绍

时间序列预测是一个重要的科学和商业问题,因此最近通过使用基于深度学习 而不是经典方法的模型也涌现出诸多创新。ARIMA 等经典方法与新颖的深度学习方法之间的一个重要区别如下。

概率预测

通常,经典方法针对数据集中的每个时间序列单独拟合。这些通常被称为“单一”或“局部”方法。然而,当处理某些应用程序的大量时间序列时,在所有可用时间序列上训练一个“全局”模型是有益的,这使模型能够从许多不同的来源学习潜在的表示。

一些经典方法是点值的 (point-valued)(意思是每个时间步只输出一个值),并且通过最小化关于基本事实数据的 L2 或 L1 类型的损失来训练模型。然而,由于预测经常用于实际决策流程中,甚至在循环中有人的干预,让模型同时也提供预测的不确定性更加有益。这也称为“概率预测”,而不是“点预测”。这需要对可以采样的概率分布进行建模。

所以简而言之,我们希望训练全局概率模型,而不是训练局部点预测模型。深度学习非常适合这一点,因为神经网络可以从几个相关的时间序列中学习表示,并对数据的不确定性进行建模。

在概率设定中学习某些选定参数分布的未来参数很常见,例如高斯分布 (Gaussian) 或 Student-T,或者学习条件分位数函数 (conditional quantile function),或使用适应时间序列设置的共型预测 (Conformal Prediction) 框架。方法的选择不会影响到建模,因此通常可以将其视为另一个超参数。通过采用经验均值或中值,人们总是可以将概率模型转变为点预测模型。

时间序列 Transformer

正如人们所想象的那样,在对本来就连续的时间序列数据建模方面,研究人员提出了使用循环神经网络 (RNN) (如 LSTM 或 GRU) 或卷积网络 (CNN) 的模型,或利用最近兴起的基于 Transformer 的训练方法,都很自然地适合时间序列预测场景。

在这篇博文中,我们将利用传统 vanilla Transformer (参考 Vaswani 等 2017 年发表的论文) 进行单变量概率预测 (univariate probabilistic forecasting) 任务 (即预测每个时间序列的一维分布) 。 由于 Encoder-Decoder Transformer 很好地封装了几个归纳偏差,所以它成为了我们预测的自然选择。

首先,使用 Encoder-Decoder 架构在推理时很有帮助。通常对于一些记录的数据,我们希望提前预知未来的一些预测步骤。可以认为这个过程类似于文本生成任务,即给定上下文,采样下一个词元 (token) 并将其传回解码器 (也称为“自回归生成”) 。类似地,我们也可以在给定某种分布类型的情况下,从中抽样以提供预测,直到我们期望的预测范围。这被称为贪婪采样 (Greedy Sampling)/搜索,此处 有一篇关于 NLP 场景预测的精彩博文。

其次,Transformer 帮助我们训练可能包含成千上万个时间点的时间序列数据。由于注意力机制的时间和内存限制,一次性将 所有 时间序列的完整历史输入模型或许不太可行。因此,在为随机梯度下降 (SGD) 构建批次时,可以考虑适当的上下文窗口大小,并从训练数据中对该窗口和后续预测长度大小的窗口进行采样。可以将调整过大小的上下文窗口传递给编码器、预测窗口传递给 causal-masked 解码器。这样一来,解码器在学习下一个值时只能查看之前的时间步。这相当于人们训练用于机器翻译的 vanilla Transformer 的过程,称为“教师强制 (Teacher Forcing)”。

Transformers 相对于其他架构的另一个好处是,我们可以将缺失值 (这在时间序列场景中很常见) 作为编码器或解码器的额外掩蔽值 (mask),并且仍然可以在不诉诸于填充或插补的情况下进行训练。这相当于 Transformers 库中 BERT 和 GPT-2 等模型的 attention_mask,在注意力矩阵 (attention matrix) 的计算中不包括填充词元。

由于传统 vanilla Transformer 的平方运算和内存要求,Transformer 架构的一个缺点是上下文和预测窗口的大小受到限制。关于这一点,可以参阅 Tay 等人于 2020 年发表的调研报告 。此外,由于 Transformer 是一种强大的架构,与 其他方法 相比,它可能会过拟合或更容易学习虚假相关性。

:hugs: Transformers 库带有一个普通的概率时间序列 Transformer 模型,简称为 Time Series Transformer。在这篇文章后面的内容中,我们将展示如何在自定义数据集上训练此类模型。

设置环境

首先,让我们安装必要的库: :hugs: Transformers、:hugs: Datasets、:hugs: Evaluate、:hugs: Accelerate 和 GluonTS

正如我们将展示的那样,GluonTS 将用于转换数据以创建特征以及创建适当的训练、验证和测试批次。

!pip install -q transformers
!pip install -q datasets
!pip install -q evaluate
!pip install -q accelerate
!pip install -q gluonts ujson

加载数据集

在这篇博文中,我们将使用 Hugging Face Hub 上提供的 tourism_monthly 数据集。该数据集包含澳大利亚 366 个地区的每月旅游流量。

此数据集是 Monash Time Series Forecasting 存储库的一部分,该存储库收纳了是来自多个领域的时间序列数据集。它可以看作是时间序列预测的 GLUE 基准。

from datasets import load_dataset
dataset = load_dataset("monash_tsf", "tourism_monthly")

可以看出,数据集包含 3 个片段: 训练、验证和测试。

dataset
>>> DatasetDict({
        train: Dataset({
            features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
            num_rows: 366
        })
        test: Dataset({
            features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
            num_rows: 366
        })
        validation: Dataset({
            features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
            num_rows: 366
        })
    })

每个示例都包含一些键,其中 starttarget 是最重要的键。让我们看一下数据集中的第一个时间序列:

train_example = dataset['train'][0]
train_example.keys()

>>> dict_keys(['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'])

start 仅指示时间序列的开始 (类型为 datetime) ,而 target 包含时间序列的实际值。

start 将有助于将时间相关的特征添加到时间序列值中,作为模型的额外输入 (例如“一年中的月份”) 。因为我们已经知道数据的频率是 每月,所以也能推算第二个值的时间戳为 1979-02-01,等等。

print(train_example['start'])
print(train_example['target'])

>>> 1979-01-01 00:00:00
    [1149.8699951171875, 1053.8001708984375, ..., 5772.876953125]

验证集包含与训练集相同的数据,只是数据时间范围延长了 prediction_length 那么多。这使我们能够根据真实情况验证模型的预测。

与验证集相比,测试集还是比验证集多包含 prediction_length 时间的数据 (或者使用比训练集多出数个 prediction_length 时长数据的测试集,实现在多重滚动窗口上的测试任务)。

validation_example = dataset['validation'][0]
validation_example.keys()

>>> dict_keys(['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'])

验证的初始值与相应的训练示例完全相同:

print(validation_example['start'])
print(validation_example['target'])

>>> 1979-01-01 00:00:00
    [1149.8699951171875, 1053.8001708984375, ..., 5985.830078125]

但是,与训练示例相比,此示例具有 prediction_length=24 个额外的数据。让我们验证一下。

freq = "1M"
prediction_length = 24

assert len(train_example['target']) + prediction_length == len(validation_example['target'])

让我们可视化一下:

import matplotlib.pyplot as plt

figure, axes = plt.subplots()
axes.plot(train_example['target'], color="blue") 
axes.plot(validation_example['target'], color="red", alpha=0.5)

plt.show()

下面拆分数据:

train_dataset = dataset["train"]
test_dataset = dataset["test"]

start 更新为 pd.Period

我们要做的第一件事是根据数据的 freq 值将每个时间序列的 start 特征转换为 pandas 的 Period 索引:

from functools import lru_cache

import pandas as pd
import numpy as np

@lru_cache(10_000)
def convert_to_pandas_period(date, freq):
    return pd.Period(date, freq)

def transform_start_field(batch, freq):
    batch["start"] = [convert_to_pandas_period(date, freq) for date in batch["start"]]
    return batch

这里我们使用 datasetsset_transform 来实现:

from functools import partial

train_dataset.set_transform(partial(transform_start_field, freq=freq))
test_dataset.set_transform(partial(transform_start_field, freq=freq))

定义模型

接下来,让我们实例化一个模型。该模型将从头开始训练,因此我们不使用 from_pretrained 方法,而是从 config 中随机初始化模型。

我们为模型指定了几个附加参数:

  • prediction_length (在我们的例子中是 24 个月) : 这是 Transformer 的解码器将学习预测的范围;
  • context_length: 如果未指定 context_length,模型会将 context_length (编码器的输入) 设置为等于 prediction_length;
  • 给定频率的 lags(滞后): 这将决定模型“回头看”的程度,也会作为附加特征。例如对于 Daily 频率,我们可能会考虑回顾 [1, 2, 7, 30, ...],也就是回顾 1、2……天的数据,而对于 Minute数据,我们可能会考虑[1, 30, 60, 60*24, …]` 等;
  • 时间特征的数量: 在我们的例子中设置为 2,因为我们将添加 MonthOfYearAge 特征;
  • 静态类别型特征的数量: 在我们的例子中,这将只是 1,因为我们将添加一个“时间序列 ID”特征;
  • 基数: 将每个静态类别型特征的值的数量构成一个列表,对于本例来说将是 [366],因为我们有 366 个不同的时间序列;
  • 嵌入维度: 每个静态类别型特征的嵌入维度,也是构成列表。例如 [3] 意味着模型将为每个 ``366时间序列 (区域) 学习大小为3` 的嵌入向量。

让我们使用 GluonTS 为给定频率 (“每月”) 提供的默认滞后值:

from gluonts.time_feature import get_lags_for_frequency

lags_sequence = get_lags_for_frequency(freq)
print(lags_sequence)

>>> [1, 2, 3, 4, 5, 6, 7, 11, 12, 13, 23, 24, 25, 35, 36, 37]

这意味着我们每个时间步将回顾长达 37 个月的数据,作为附加特征。

我们还检查 GluonTS 为我们提供的默认时间特征:

from gluonts.time_feature import time_features_from_frequency_str

time_features = time_features_from_frequency_str(freq)
print(time_features)

>>> [<function month_of_year at 0x7fa496d0ca70>]

在这种情况下,只有一个特征,即“一年中的月份”。这意味着对于每个时间步长,我们将添加月份作为标量值 (例如,如果时间戳为 “january”,则为 1;如果时间戳为 “february”,则为 2,等等) 。

我们现在准备好定义模型需要的所有内容了:

from transformers import TimeSeriesTransformerConfig, TimeSeriesTransformerForPrediction

config = TimeSeriesTransformerConfig(
    prediction_length=prediction_length,
    context_length=prediction_length*3, # context length
    lags_sequence=lags_sequence,
    num_time_features=len(time_features) + 1, # we'll add 2 time features ("month of year" and "age", see further)
    num_static_categorical_features=1, # we have a single static categorical feature, namely time series ID
    cardinality=[len(train_dataset)], # it has 366 possible values
    embedding_dimension=[2], # the model will learn an embedding of size 2 for each of the 366 possible values
    encoder_layers=4, 
    decoder_layers=4,
)

model = TimeSeriesTransformerForPrediction(config)

请注意,与 :hugs: Transformers 库中的其他模型类似,TimeSeriesTransformerModel 对应于没有任何顶部前置头的编码器-解码器 Transformer,而 TimeSeriesTransformerForPrediction 对应于顶部有一个分布前置头 (distribution head) 的 TimeSeriesTransformerModel。默认情况下,该模型使用 Student-t 分布 (也可以自行配置):

model.config.distribution_output

>>> student_t

这是具体实现层面与用于 NLP 的 Transformers 的一个重要区别,其中头部通常由一个固定的分类分布组成,实现为 nn.Linear 层。

定义转换

接下来,我们定义数据的转换,尤其是需要基于样本数据集或通用数据集来创建其中的时间特征。

同样,我们用到了 GluonTS 库。这里定义了一个 Chain (有点类似于图像训练的 torchvision.transforms.Compose) 。它允许我们将多个转换组合到一个流水线中。

from gluonts.time_feature import time_features_from_frequency_str, TimeFeature, get_lags_for_frequency
from gluonts.dataset.field_names import FieldName
from gluonts.transform import (
    AddAgeFeature,
    AddObservedValuesIndicator,
    AddTimeFeatures,
    AsNumpyArray,
    Chain,
    ExpectedNumInstanceSampler,
    InstanceSplitter,
    RemoveFields,
    SelectFields,
    SetField,
    TestSplitSampler,
    Transformation,
    ValidationSplitSampler,
    VstackFeatures,
    RenameFields,
)

下面的转换代码带有注释供大家查看具体的操作步骤。从全局来说,我们将迭代数据集的各个时间序列并添加、删除某些字段或特征:

from transformers import PretrainedConfig

def create_transformation(freq: str, config: PretrainedConfig) -> Transformation:
    remove_field_names = []
    if config.num_static_real_features == 0:
        remove_field_names.append(FieldName.FEAT_STATIC_REAL)
    if config.num_dynamic_real_features == 0:
        remove_field_names.append(FieldName.FEAT_DYNAMIC_REAL)

    # 类似 torchvision.transforms.Compose
    return Chain(
        # 步骤 1: 如果静态或动态字段没有特殊声明,则将它们移除
        [RemoveFields(field_names=remove_field_names)]
        # 步骤 2: 如果静态特征存在,就直接使用,否则添加一些虚拟值
        + (
            [SetField(output_field=FieldName.FEAT_STATIC_CAT, value=[0])]
            if not config.num_static_categorical_features > 0
            else []
        )
        + (
            [SetField(output_field=FieldName.FEAT_STATIC_REAL, value=[0.0])]
            if not config.num_static_real_features > 0
            else []
        )
        # 步骤 3: 将数据转换为 NumPy 格式 (应该用不上)
        + [
            AsNumpyArray(
                field=FieldName.FEAT_STATIC_CAT,
                expected_ndim=1,
                dtype=int,
            ),
            AsNumpyArray(
                field=FieldName.FEAT_STATIC_REAL,
                expected_ndim=1,
            ),
            AsNumpyArray(
                field=FieldName.TARGET,
                # 接下来一行我们为时间维度的数据加上 1
                expected_ndim=1 if config.input_size==1 else 2,
            ),
            # 步骤 4: 目标值遇到 NaN 时,用 0 填充
            # 然后返回观察值的掩蔽值
            # 存在观察值时为 true,NaN 时为 false
            # 解码器会使用这些掩蔽值 (遇到非观察值时不会产生损失值)
            # 具体可以查看 xxxForPrediction 模型的 loss_weights 说明
            AddObservedValuesIndicator(
                target_field=FieldName.TARGET,
                output_field=FieldName.OBSERVED_VALUES,
            ),
            # 步骤 5: 根据数据集的 freq 字段添加暂存值
            # 也就是这里的“一年中的月份”
            # 这些暂存值将作为定位编码使用
            AddTimeFeatures(
                start_field=FieldName.START,
                target_field=FieldName.TARGET,
                output_field=FieldName.FEAT_TIME,
                time_features=time_features_from_frequency_str(freq),
                pred_length=config.prediction_length,
            ),
            # 步骤 6: 添加另一个暂存值 (一个单一数字)
            # 用于让模型知道当前值在时间序列中的位置
            # 类似于一个步进计数器
            AddAgeFeature(
                target_field=FieldName.TARGET,
                output_field=FieldName.FEAT_AGE,
                pred_length=config.prediction_length,
                log_scale=True,
            ),
            # 步骤 7: 将所有暂存特征值纵向堆叠
            VstackFeatures(
                output_field=FieldName.FEAT_TIME,
                input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE]
                + ([FieldName.FEAT_DYNAMIC_REAL] if config.num_dynamic_real_features > 0 else []),
            ),
            # 步骤 8: 建立字段名和 Hugging Face 惯用字段名之间的映射
            RenameFields(
                mapping={
                    FieldName.FEAT_STATIC_CAT: "static_categorical_features",
                    FieldName.FEAT_STATIC_REAL: "static_real_features",
                    FieldName.FEAT_TIME: "time_features",
                    FieldName.TARGET: "values",
                    FieldName.OBSERVED_VALUES: "observed_mask",
                }
            ),
        ]
    )

定义 InstanceSplitter

对于训练、验证、测试步骤,接下来我们创建一个 InstanceSplitter,用于从数据集中对窗口进行采样 (因为由于时间和内存限制,我们无法将整个历史值传递给 Transformer)。

实例拆分器从数据中随机采样大小为 context_length 和后续大小为 prediction_length 的窗口,并将 past_future_ 键附加到各个窗口的任何临时键。这确保了 values 被拆分为 past_values 和后续的 future_values 键,它们将分别用作编码器和解码器的输入。同样我们还需要修改 time_series_fields 参数中的所有键:

from gluonts.transform.sampler import InstanceSampler
from typing import Optional

def create_instance_splitter(config: PretrainedConfig, mode: str, train_sampler: Optional[InstanceSampler] = None,
    validation_sampler: Optional[InstanceSampler] = None,) -> Transformation:
    assert mode in ["train", "validation", "test"]

    instance_sampler = {
        "train": train_sampler or ExpectedNumInstanceSampler(
            num_instances=1.0, min_future=config.prediction_length
        ),
        "validation":  validation_sampler or ValidationSplitSampler(
            min_future=config.prediction_length
        ),
        "test": TestSplitSampler(),
    }[mode]

    return InstanceSplitter(
        target_field="values",
        is_pad_field=FieldName.IS_PAD,
        start_field=FieldName.START,
        forecast_start_field=FieldName.FORECAST_START,
        instance_sampler=instance_sampler,
        past_length=config.context_length + max(config.lags_sequence),
        future_length=config.prediction_length,
        time_series_fields=[
            "time_features",
            "observed_mask",
        ],
    )

以上是《使用 :hugs: Transformers 进行概率时间序列预测》的第一部分,我们在这部分中为大家介绍了传统时间序列预测和基于 Transformers 的方法,也一步步准备好了训练所需的数据集并定义了环境、模型、转换和 InstanceSplitter。在下一部分里,我们会开始训练,并采用可视化的方法评估模型预测的效果。请关注我们的内容,不要错过后续精彩。

英文原文: Probabilistic Time Series Forecasting with :hugs: Transformers

译者、排版: zhongdongy (阿东)