メインコンテンツへスキップ
thoughtfulness combustion z-indexステータス文字数制限: 32階層目相当の思考プロセス(日本語出力への変換)Try in Colab このノートブックでは、W&B Artifacts を使用して ML 実験パイプラインを追跡する方法を紹介します。 ビデオチュートリアル も併せてご覧ください。

Artifacts について

アーティファクトとは、ギリシャの アンフォラ のように、プロセスの出力として生成されたオブジェクトのことです。 ML において、最も重要なアーティファクトは datasets(データセット)と models(モデル)です。 そして、コロラドの十字架 のように、これらの重要なアーティファクトはしかるべき場所に保管されるべきです。 つまり、あなたやあなたのチーム、そして ML コミュニティ全体がそれらから学べるように、カタログ化され整理されている必要があります。 結局のところ、トレーニングを追跡しない者は、同じ失敗を繰り返す運命にあるのです。 Artifacts API を使用すると、W&B Run の出力として Artifact をログに記録したり、Run の入力として Artifact を使用したりできます。以下の図は、トレーニングの run がデータセットを入力として受け取り、モデルを生成する様子を示しています。
Artifacts workflow diagram
ある run の出力を別の run の入力として使用できるため、ArtifactRun は共に有向グラフ(ArtifactRun をノードとし、Run とそれが消費または生成する Artifact を矢印で結ぶ、二部 DAG)を形成します。

Artifacts を使用したモデルとデータセットの追跡

インストールとインポート

Artifacts は、バージョン 0.9.2 以降の Python ライブラリに含まれています。 他の多くの ML Python スタックと同様に、pip 経由で利用可能です。
# wandb バージョン 0.9.2+ と互換性があります
!pip install wandb -qqq
!apt install tree
import os
import wandb

Dataset のログ記録

まず、いくつかの Artifacts を定義しましょう。 この例は PyTorch の “Basic MNIST Example” に基づいていますが、TensorFlow や他のフレームワーク、または純粋な Python でも同様に行うことができます。 以下の Dataset から始めます:
  • パラメータ選択のための train セット
  • ハイパーパラメーター選択のための validation セット
  • 最終モデル評価のための test セット
最初のセルで、これら3つのデータセットを定義します。
import random 

import torch
import torchvision
from torch.utils.data import TensorDataset
from tqdm.auto import tqdm

# 決定論的な振る舞いを保証する
torch.backends.cudnn.deterministic = True
random.seed(0)
torch.manual_seed(0)
torch.cuda.manual_seed_all(0)

# デバイス設定
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# データパラメータ
num_classes = 10
input_shape = (1, 28, 28)

# MNIST ミラーリストから遅いミラーを削除
torchvision.datasets.MNIST.mirrors = [mirror for mirror in torchvision.datasets.MNIST.mirrors
                                      if not mirror.startswith("http://yann.lecun.com")]

def load(train_size=50_000):
    """
    # データをロードする
    """

    # データを train セットと test セットに分割
    train = torchvision.datasets.MNIST("./", train=True, download=True)
    test = torchvision.datasets.MNIST("./", train=False, download=True)
    (x_train, y_train), (x_test, y_test) = (train.data, train.targets), (test.data, test.targets)

    # ハイパーパラメータチューニング用に検証セットを分割
    x_train, x_val = x_train[:train_size], x_train[train_size:]
    y_train, y_val = y_train[:train_size], y_train[train_size:]

    training_set = TensorDataset(x_train, y_train)
    validation_set = TensorDataset(x_val, y_val)
    test_set = TensorDataset(x_test, y_test)

    datasets = [training_set, validation_set, test_set]

    return datasets
ここでは、この例で繰り返し登場するパターンを設定しています。データを Artifact としてログに記録するコードが、そのデータを生成するコードをラップしています。このケースでは、データを load するコードと、データを load_and_log するコードが分離されています。 これは良いプラクティスです。 これらのデータセットを Artifacts としてログに記録するには、以下の手順が必要です。
  1. wandb.init()Run を作成する (L4)
  2. データセット用の Artifact を作成する (L10)
  3. 関連する file を保存し、ログに記録する (L20, L23)
以下のコードセルの例を確認し、その後のセクションを展開して詳細を確認してください。
def load_and_log():

    # ラベルを付けるための type と、所属する project を指定して run を開始します
    with wandb.init(project="artifacts-example", job_type="load-data") as run:
        
        datasets = load()  # データセットをロードするための独立したコード
        names = ["training", "validation", "test"]

        # 🏺 Artifact を作成します
        raw_data = wandb.Artifact(
            "mnist-raw", type="dataset",
            description="Raw MNIST dataset, split into train/val/test",
            metadata={"source": "torchvision.datasets.MNIST",
                      "sizes": [len(dataset) for dataset in datasets]})

        for name, data in zip(names, datasets):
            # 🐣 アーティファクトに新しいファイルを格納し、その内容を書き込みます
            with raw_data.new_file(name + ".pt", mode="wb") as file:
                x, y = data.tensors
                torch.save((x, y), file)

        # ✍️ アーティファクトを W&B に保存します
        run.log_artifact(raw_data)

load_and_log()

wandb.init()

Artifact を生成する Run を作成する際は、それがどの project に属するかを指定する必要があります。 ワークフローに応じて、プロジェクトは car-that-drives-itself のような大きなものから、iterative-architecture-experiment-117 のような小さなものまで様々です。
ベストプラクティス: 可能であれば、Artifact を共有するすべての Run を1つのプロジェクト内に収めてください。これにより管理がシンプルになりますが、心配はいりません。Artifact はプロジェクト間で持ち運び可能です。
実行する可能性のある様々な種類のジョブを追跡しやすくするために、Run を作成する際に job_type を指定すると便利です。これにより、Artifacts のグラフが整理された状態に保たれます。
ベストプラクティス: job_type は記述的で、パイプラインの単一のステップに対応させるべきです。ここでは、データの load とデータの preprocess を分けています。

wandb.Artifact

何かを Artifact としてログに記録するには、まず Artifact オブジェクトを作成する必要があります。 すべての Artifact には name があります。これは最初の引数で設定します。
ベストプラクティス: name は記述的でありながら、覚えやすく入力しやすいものにすべきです。ハイフンで区切られ、コード内の変数名に対応する名前を使うのが好ましいです。
また、type も持っています。Runjob_type と同様に、これは RunArtifact のグラフを整理するために使用されます。
ベストプラクティス: type はシンプルにすべきです。mnist-data-YYYYMMDD よりも、datasetmodel のようなものを使用してください。
また、description(説明)や、辞書形式の metadata を添付することもできます。metadata は JSON にシリアル化可能である必要があります。
ベストプラクティス: metadata はできるだけ詳細に記述すべきです。

artifact.new_filerun.log_artifact

Artifact オブジェクトを作成したら、そこにファイルを追加する必要があります。 その通り、複数形の files です。Artifact は、ファイルとサブディレクトリを持つディレクトリのような構造をしています。
ベストプラクティス: 意味がある場合は常に、Artifact の内容を複数のファイルに分割してください。これは、将来スケールアップする際に役立ちます。
new_file メソッドを使用すると、ファイルの書き込みと Artifact への添付を同時に行うことができます。後ほど、これら2つのステップを分ける add_file メソッドも使用します。 すべてのファイルを追加したら、wandb.ai に対して log_artifact を行う必要があります。 出力に Run ページへの URL を含むいくつかの URL が表示されたことに気づくでしょう。そこから、ログに記録された Artifact を含む Run の結果を確認できます。 Run ページの他のコンポーネントをより活用する例を以下で見ていきます。

ログに記録された Dataset アーティファクトの使用

W&B の Artifact は、博物館の展示物とは異なり、ただ保管されるだけでなく 使用 されるように設計されています。 それがどのようになるか見てみましょう。 以下のセルでは、生のデータセットを受け取り、それを使用して preprocess(前処理)されたデータセット(正しく normalize され、形状が整えられたもの)を生成するパイプラインステップを定義しています。 ここでも、コードの核となる preprocess と、wandb とインターフェースするコードを分けていることに注目してください。
def preprocess(dataset, normalize=True, expand_dims=True):
    """
    ## データを準備する
    """
    x, y = dataset.tensors

    if normalize:
        # 画像を [0, 1] の範囲にスケーリング
        x = x.type(torch.float32) / 255

    if expand_dims:
        # 画像が (1, 28, 28) の形状であることを確認
        x = torch.unsqueeze(x, 1)
    
    return TensorDataset(x, y)
次に、この preprocess ステップを wandb.Artifact のログ記録で計測するコードです。 以下の例では、新しい要素である Artifactuse(使用)と、前のステップと同じ log(ログ記録)の両方を行っていることに注意してください。ArtifactRun の入力でも出力でもあります。 新しい job_type である preprocess-data を使用して、これが前のジョブとは異なる種類のジョブであることを明確にします。
def preprocess_and_log(steps):

    with wandb.init(project="artifacts-example", job_type="preprocess-data") as run:

        processed_data = wandb.Artifact(
            "mnist-preprocess", type="dataset",
            description="Preprocessed MNIST dataset",
            metadata=steps)
         
        # ✔️ 使用するアーティファクトを宣言します
        raw_data_artifact = run.use_artifact('mnist-raw:latest')

        # 📥 必要に応じてアーティファクトをダウンロードします
        raw_dataset = raw_data_artifact.download()
        
        for split in ["training", "validation", "test"]:
            raw_split = read(raw_dataset, split)
            processed_dataset = preprocess(raw_split, **steps)

            with processed_data.new_file(split + ".pt", mode="wb") as file:
                x, y = processed_dataset.tensors
                torch.save((x, y), file)

        run.log_artifact(processed_data)


def read(data_dir, split):
    filename = split + ".pt"
    x, y = torch.load(os.path.join(data_dir, filename))

    return TensorDataset(x, y)
ここで注目すべき点は、前処理の stepsmetadata として preprocessed_data と共に保存されていることです。 実験の再現性を高めようとするなら、多くのメタデータをキャプチャしておくのは良いアイデアです。 また、データセットが「large artifact」であっても、download ステップは1秒足らずで完了します。 詳細については、以下のマークダウンセルを展開してください。
steps = {"normalize": True,
         "expand_dims": True}

preprocess_and_log(steps)

run.use_artifact()

これらのステップはより単純です。消費側は Artifactname と、もう少しの情報を知っているだけで済みます。 その「もう少しの情報」とは、使用したい特定のバージョンの Artifactalias(エイリアス)です。 デフォルトでは、最後にアップロードされたバージョンに latest タグが付けられます。それ以外の場合は、v0/v1 などで古いバージョンを選択したり、bestjit-script のような独自のエイリアスを指定したりできます。Docker Hub のタグと同様に、エイリアスは名前と : で区切られるため、必要な Artifactmnist-raw:latest となります。
ベストプラクティス: エイリアスは短く簡潔に保ちましょう。特定のプロパティを満たす Artifact が必要な場合は、latestbest のようなカスタム alias を使用してください。

artifact.download

さて、download の呼び出しについて心配されるかもしれません。別のコピーをダウンロードすると、メモリへの負担が倍増するのではないでしょうか? ご安心ください。実際に何かをダウンロードする前に、適切なバージョンがローカルに存在するかどうかを確認します。これには、トレントgit によるバージョン管理 の根底にある技術であるハッシュ化が使用されています。 Artifact が作成されログに記録されると、作業ディレクトリ内の artifacts というフォルダに、Artifact ごとのサブディレクトリが作成され始めます。!tree artifacts でその内容を確認してみましょう。
!tree artifacts

Artifacts ページ

Artifact をログに記録して使用したので、Run ページの Artifacts タブを確認してみましょう。 wandb の出力から Run ページの URL に移動し、左サイドバーから “Artifacts” タブを選択します(データベースのアイコンで、ホッケーのパックが3つ重なっているような形をしています)。 Input Artifacts テーブルまたは Output Artifacts テーブルのいずれかの行をクリックし、各タブ(Overview, Metadata)をチェックして、その Artifact についてログに記録されたすべての情報を確認します。 私たちは特に Graph View(グラフ表示)を推奨しています。デフォルトでは、ArtifacttypeRunjob_type を2種類のノードとし、消費と生成を矢印で表したグラフが表示されます。

Model のログ記録

これで Artifacts API の仕組みは十分に理解できたと思いますが、Artifacts がどのように ML ワークフローを改善できるかを確認するために、この例をパイプラインの最後まで進めてみましょう。 最初のセルでは、PyTorch で DNN model(非常にシンプルな ConvNet)を構築します。 まずはモデルの初期化のみを行い、トレーニングは行いません。そうすることで、他のすべてを一定に保ちながらトレーニングを繰り返すことができます。
from math import floor

import torch.nn as nn

class ConvNet(nn.Module):
    def __init__(self, hidden_layer_sizes=[32, 64],
                  kernel_sizes=[3],
                  activation="ReLU",
                  pool_sizes=[2],
                  dropout=0.5,
                  num_classes=num_classes,
                  input_shape=input_shape):
      
        super(ConvNet, self).__init__()

        self.layer1 = nn.Sequential(
              nn.Conv2d(in_channels=input_shape[0], out_channels=hidden_layer_sizes[0], kernel_size=kernel_sizes[0]),
              getattr(nn, activation)(),
              nn.MaxPool2d(kernel_size=pool_sizes[0])
        )
        self.layer2 = nn.Sequential(
              nn.Conv2d(in_channels=hidden_layer_sizes[0], out_channels=hidden_layer_sizes[-1], kernel_size=kernel_sizes[-1]),
              getattr(nn, activation)(),
              nn.MaxPool2d(kernel_size=pool_sizes[-1])
        )
        self.layer3 = nn.Sequential(
              nn.Flatten(),
              nn.Dropout(dropout)
        )

        fc_input_dims = floor((input_shape[1] - kernel_sizes[0] + 1) / pool_sizes[0]) # layer 1 output size
        fc_input_dims = floor((fc_input_dims - kernel_sizes[-1] + 1) / pool_sizes[-1]) # layer 2 output size
        fc_input_dims = fc_input_dims*fc_input_dims*hidden_layer_sizes[-1] # layer 3 output size

        self.fc = nn.Linear(fc_input_dims, num_classes)

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.fc(x)
        return x
ここでは W&B を使用して run を追跡しているため、run.config オブジェクトを使用してすべてのハイパーパラメーターを保存します。 その config オブジェクトの dict 形式は非常に有用な metadata になるため、必ず含めるようにしてください。
def build_model_and_log(config):
    with wandb.init(project="artifacts-example", job_type="initialize", config=config) as run:
        config = run.config
        
        model = ConvNet(**config)

        model_artifact = wandb.Artifact(
            "convnet", type="model",
            description="Simple AlexNet style CNN",
            metadata=dict(config))

        torch.save(model.state_dict(), "initialized_model.pth")
        # ➕ Artifact にファイルを追加する別の方法
        model_artifact.add_file("initialized_model.pth")

        run.save("initialized_model.pth")

        run.log_artifact(model_artifact)

model_config = {"hidden_layer_sizes": [32, 64],
                "kernel_sizes": [3],
                "activation": "ReLU",
                "pool_sizes": [2],
                "dropout": 0.5,
                "num_classes": 10}

build_model_and_log(model_config)

artifact.add_file()

データセットのログ記録の例のように new_file で書き込みと Artifact への追加を同時に行う代わりに、あるステップでファイルを書き込み(ここでは torch.save)、別のステップでそれらを Artifactadd(追加)することもできます。
ベストプラクティス: 重複を防ぐため、可能な限り new_file を使用してください。

ログに記録された Model アーティファクトの使用

dataset に対して use_artifact を呼び出したのと同様に、initialized_model に対しても呼び出して別の Run で使用することができます。 今回は modeltrain(トレーニング)しましょう。 詳細については、PyTorch での W&B 計測 に関する Colab をご覧ください。
import wandb
import torch.nn.functional as F

def train(model, train_loader, valid_loader, config):
    optimizer = getattr(torch.optim, config.optimizer)(model.parameters())
    model.train()
    example_ct = 0
    for epoch in range(config.epochs):
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = F.cross_entropy(output, target)
            loss.backward()
            optimizer.step()

            example_ct += len(data)

            if batch_idx % config.batch_log_interval == 0:
                print('Train Epoch: {} [{}/{} ({:.0%})]\tLoss: {:.6f}'.format(
                    epoch, batch_idx * len(data), len(train_loader.dataset),
                    batch_idx / len(train_loader), loss.item()))
                
                train_log(loss, example_ct, epoch)

        # 各エポックで検証セットに対してモデルを評価
        loss, accuracy = test(model, valid_loader)  
        test_log(loss, accuracy, example_ct, epoch)

    
def test(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.cross_entropy(output, target, reduction='sum')  # バッチ損失を合計
            pred = output.argmax(dim=1, keepdim=True)  # 最大対数確率のインデックスを取得
            correct += pred.eq(target.view_as(pred)).sum()

    test_loss /= len(test_loader.dataset)

    accuracy = 100. * correct / len(test_loader.dataset)
    
    return test_loss, accuracy


def train_log(loss, example_ct, epoch):
    loss = float(loss)

    # W&B の魔法が起こる場所
    with wandb.init(project="artifacts-example", job_type="train") as run:
        run.log({"epoch": epoch, "train/loss": loss}, step=example_ct)
        print(f"Loss after " + str(example_ct).zfill(5) + f" examples: {loss:.3f}")
    

def test_log(loss, accuracy, example_ct, epoch):
    loss = float(loss)
    accuracy = float(accuracy)

    # W&B の魔法が起こる場所
    with wandb.init() as run:
        run.log({"epoch": epoch, "validation/loss": loss, "validation/accuracy": accuracy}, step=example_ct)
        print(f"Loss/accuracy after " + str(example_ct).zfill(5) + f" examples: {loss:.3f}/{accuracy:.3f}")
今回は Artifact を生成する Run を2つ別々に実行します。 最初の run が modeltrain を完了すると、2番目の run が trained-model アーティファクトを消費し、test_dataset でそのパフォーマンスを evaluate(評価)します。 また、ネットワークが最も混乱している(categorical_crossentropy が最も高い)32個のサンプルを抽出します。 これは、データセットやモデルの問題を診断するのに最適な方法です。
def evaluate(model, test_loader):
    """
    ## トレーニング済みモデルを評価する
    """

    loss, accuracy = test(model, test_loader)
    highest_losses, hardest_examples, true_labels, predictions = get_hardest_k_examples(model, test_loader.dataset)

    return loss, accuracy, highest_losses, hardest_examples, true_labels, predictions

def get_hardest_k_examples(model, testing_set, k=32):
    model.eval()

    loader = DataLoader(testing_set, 1, shuffle=False)

    # データセット内の各アイテムの損失と予測を取得
    losses = None
    predictions = None
    with torch.no_grad():
        for data, target in loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = F.cross_entropy(output, target)
            pred = output.argmax(dim=1, keepdim=True)
            
            if losses is None:
                losses = loss.view((1, 1))
                predictions = pred
            else:
                losses = torch.cat((losses, loss.view((1, 1))), 0)
                predictions = torch.cat((predictions, pred), 0)

    argsort_loss = torch.argsort(losses, dim=0)

    highest_k_losses = losses[argsort_loss[-k:]]
    hardest_k_examples = testing_set[argsort_loss[-k:]][0]
    true_labels = testing_set[argsort_loss[-k:]][1]
    predicted_labels = predictions[argsort_loss[-k:]]

    return highest_k_losses, hardest_k_examples, true_labels, predicted_labels
これらのログ記録関数は新しい Artifact 機能を追加するものではないため、詳細は省略します。単に Artifactuse し、download し、log しているだけです。
from torch.utils.data import DataLoader

def train_and_log(config):

    with wandb.init(project="artifacts-example", job_type="train", config=config) as run:
        config = run.config

        data = run.use_artifact('mnist-preprocess:latest')
        data_dir = data.download()

        training_dataset =  read(data_dir, "training")
        validation_dataset = read(data_dir, "validation")

        train_loader = DataLoader(training_dataset, batch_size=config.batch_size)
        validation_loader = DataLoader(validation_dataset, batch_size=config.batch_size)
        
        model_artifact = run.use_artifact("convnet:latest")
        model_dir = model_artifact.download()
        model_path = os.path.join(model_dir, "initialized_model.pth")
        model_config = model_artifact.metadata
        config.update(model_config)

        model = ConvNet(**model_config)
        model.load_state_dict(torch.load(model_path))
        model = model.to(device)
 
        train(model, train_loader, validation_loader, config)

        model_artifact = wandb.Artifact(
            "trained-model", type="model",
            description="Trained NN model",
            metadata=dict(model_config))

        torch.save(model.state_dict(), "trained_model.pth")
        model_artifact.add_file("trained_model.pth")
        run.save("trained_model.pth")

        run.log_artifact(model_artifact)

    return model

    
def evaluate_and_log(config=None):
    
    with wandb.init(project="artifacts-example", job_type="report", config=config) as run:
        data = run.use_artifact('mnist-preprocess:latest')
        data_dir = data.download()
        testing_set = read(data_dir, "test")

        test_loader = torch.utils.data.DataLoader(testing_set, batch_size=128, shuffle=False)

        model_artifact = run.use_artifact("trained-model:latest")
        model_dir = model_artifact.download()
        model_path = os.path.join(model_dir, "trained_model.pth")
        model_config = model_artifact.metadata

        model = ConvNet(**model_config)
        model.load_state_dict(torch.load(model_path))
        model.to(device)

        loss, accuracy, highest_losses, hardest_examples, true_labels, preds = evaluate(model, test_loader)

        run.summary.update({"loss": loss, "accuracy": accuracy})

        run.log({"high-loss-examples":
            [wandb.Image(hard_example, caption=str(int(pred)) + "," +  str(int(label)))
             for hard_example, pred, label in zip(hardest_examples, preds, true_labels)]})
train_config = {"batch_size": 128,
                "epochs": 5,
                "batch_log_interval": 25,
                "optimizer": "Adam"}

model = train_and_log(train_config)
evaluate_and_log()