文 / Google Cloud Platform 技術主管 Lak Lakshmanan (@lak_gcp)
張量處理單元 (TPU) 可加速處理 Google 內各種機器學習工作負載,并可供 Google Cloud 客戶使用。您可以在Cloud TPU 參考模型存儲區找到啟用 TPU 的頂尖圖像模型版本,例如 ResNet 和 AmoebaNet;您還可以使用強大的Tensor2Tensor庫,在 TPU 上執行文本摘要和問答任務。這些教程會為您分步介紹如何使用當下很多最熱門的 Cloud TPU 參考模型。
注:存儲區鏈接
https://github.com/tensorflow/tpu
教程鏈接
https://cloud.google.com/tpu/docs/tutorials
但如果您擁有自定義 TensorFlow 模型,又該如何做呢?在本文中,我會逐步介紹編寫自定義估算器以便在 Cloud TPU 上運行的全過程。在此過程中,我會指出需要注意的地方和建議采用的最佳實踐。您可以在 GitHub 上找到此解決方案的完整代碼;本文僅列出相關代碼片段。
注:解決方案的完整代碼鏈接
https://github.com/GoogleCloudPlatform/training-data-analyst/tree/master/courses/machine_learning/deepdive/08_image/flowersmodeltpu
自定義 TensorFlow 估算器包含以模型函數傳遞的基類估算器:
1def train_and_evaluate(output_dir, nsteps):
2estimator = tf.estimator.Estimator(
3model_fn = model_fn,
4model_dir = output_dir)
模型函數會接收特征、標簽和模式,并返回 EstimatorSpec。例如,圖像分類問題的模型函數可能包含
1def model_fn(features, labels, mode):
2# write the model to compute predictions, loss, etc. from the model
3
4return tf.estimator.EstimatorSpec(
5mode=mode,
6predictions={"probabilities": probabilities,
7 "classid": class_int, "class": class_str},
8loss=loss,
9train_op=train_op,
10eval_metric_ops=evalmetrics,
11export_outputs={'classes': tf.estimator.export.PredictOutput(
12{"probabilities": probabilities, "classid": class_int,
13 "class": class_str})}
14)
TensorFlow 中的 tf.contrib.tpu 包提供了包裝器類,可助您以適當方式編寫代碼,以便在 CPU、GPU 和 Cloud TPU 上運行代碼。下面我們就來看看如何以這種不受加速器限制的方式編寫自定義估計器。
1.將輸入數據轉換為 TF 記錄
Cloud TPU 的速度非常快,一不小心,您的訓練就會變成以讀寫(“饋入” 和 “饋出”)數據和存儲檢查點為主。讓 TPU 等待輸入/輸出會造成浪費,所以我們會做幾件事以充分利用 TPU 用于計算的時間。
首先是避免在估算器的輸入函數中進行數據解析和整理,而是預先將數據轉換為 TF 記錄。與單個圖像文件相比,批量處理 TF 記錄更為簡單,因為記錄本身包含標簽,如此可以減少系統必須讀取的小型文件數量。我使用 Apache Beam 進行這種轉換。您可以在官方 TPU 存儲區找到讀取 JPEG 和編寫 TF 記錄的腳本。您可以在 Cloud Dataflow 上大規模地執行 Apache Beam 程序,但如果您的數據源目前不在 Google Cloud 上,則只能在大型 VM 上本地執行此程序(請務必用 pip 安裝 apache-beam)。
注:JPEG 和編寫 TF 記錄鏈接
https://github.com/tensorflow/tpu/blob/master/tools/datasets/jpeg_to_tf_record.py
TF 記錄是字典。對于圖像分類,上述管道編寫的兩個條目很重要,分別是:“image/class/label”(采用 int64)和 “image/encoded”(由 JPEG 文件內容組成)。
2.編寫輸入函數以讀取 TF 記錄
與任何估算器一樣,您需要編寫輸入函數,以讀取這些 TF 記錄。使用 Dataset API 可極大地簡化此任務,但還需注意幾個問題。在講解過程中,我會指出這些問題。
以下是我的輸入函數:
1def make_input_fn(pattern, mode, num_cores=8, transpose_input=False):
2def _set_shapes(batch_size, images, labels):
3"""Statically set the batch_size dimension."""
4if transpose_input:
5images.set_shape(images.get_shape().merge_with(
6tf.TensorShape([None, None, None, batch_size])))
7labels.set_shape(labels.get_shape().merge_with(
6tf.TensorShape([batch_size])))
9else:
10images.set_shape(images.get_shape().merge_with(
11tf.TensorShape([batch_size, None, None, None])))
12labels.set_shape(labels.get_shape().merge_with(
13tf.TensorShape([batch_size])))
14return images, labels
15
16def _input_fn(params):
17batch_size = params['batch_size']
18is_training = (mode == tf.estimator.ModeKeys.TRAIN)
19
20# read the dataset
21dataset = tf.data.Dataset.list_files(pattern, shuffle=is_training)
22if is_training:
23dataset = dataset.repeat()
24def fetch_dataset(filename):
25buffer_size = 8 * 1024 * 1024 # 8 MiB per file
26dataset = tf.data.TFRecordDataset(filename, buffer_size=buffer_size)
27return dataset
28dataset = dataset.apply(
29tf.contrib.data.parallel_interleave(
30fetch_dataset, cycle_length=64, sloppy=True))
31dataset = dataset.shuffle(1024)
32
33# augment and batch
34dataset = dataset.apply(
35tf.contrib.data.map_and_batch(
36read_and_preprocess, batch_size=batch_size,
37num_parallel_batches=num_cores, drop_remainder=True
38))
39
40 if transpose_input:
41 dataset = dataset.map(
42 lambda images, labels: (tf.transpose(images, [1, 2, 3, 0]), labels),
43 num_parallel_calls=num_cores)
44
45# assign static shape
46dataset = dataset.map(
47functools.partial(_set_shapes, batch_size)
48)
49
50# prefetch data while training
51dataset = dataset.prefetch(tf.contrib.data.AUTOTUNE)
52return dataset
53
54return _input_fn
請注意,輸入函數采用 params 參數。實際上,這將是傳遞至訓練程序的命令行參數,如此一來,我們便可提取有關數據集的詳情,例如訓練次數和評估圖像。
batch_size 很特別,因為 TPU 有多個核心,而 batch_size 由 TPU 估算器設置,且為有效批次大小。您必須完全返回 batch_size 記錄,而不能返回部分填充的批次。由于您會無限期地循環使用訓練數據,所以在訓練期間不會出現此問題。但這意味著最簡單的做法是將評估數據集向下舍入為核心數的倍數。如果核心數為 8,而評估數據集中有 1026 張圖像,您只能使用前 1024 張圖像進行評估。剩余的 2 張圖像則會舍棄。(我們也有方法在 Cloud TPU 中處理最后剩下的部分批次,我就不在此贅述。)
與任何分布式訓練一樣,您應確保每個工作器看到不同的數據子集,這是由所有文件的并行交錯及緩沖區本身內部的記錄重排進行處理。
圖像分類問題的一個常見需求是通過添加隨機裁剪及翻轉等方法來增強原始數據。我通過 read_and_preprocess 函數做到這一點。請注意,我將此函數應用于各個 TF 記錄并創建了 8 個并行批次,同時舍棄剩余的任何記錄(再次提醒,這在訓練期間不會造成任何影響,因為您會無限期重復進行訓練)。
接下來是轉置數據。事實證明,在 TPU 中轉置數據以保持批次大小可以極大地提高性能。因此,我們可以根據需要采取此做法。如果我們在 GPU 或 CPU 上運行程序,則 transpose_input 標記會變為 false。
TPU 需要靜態大小的張量。盡管我們已確保維持這種情況(通過舍棄剩余的批次),但仍需為核心 TensorFlow 編寫 Dataset API,這是更常見的做法。因此,我們調用一個函數,將數據集中的 batch_size 從 None 改為 batch_size。
最后的優化操作至關重要。我們需要預取數據。換句話說,當 TPU 處理一批記錄時,我們會通過 I/O 線程尋找并提取下一批次。如此一來,我們便可最大限度地利用 TPU(或 GPU),而且這對 CPU 沒有任何影響。
3.處理 TF 記錄
(上述)輸入函數會設置處理輸入內容的方式,但實際的解析操作還需由名為 read_and_preprocess() 的方法執行。此方法如下所示:
1def read_and_preprocess(example_data):
2parsed = tf.parse_single_example(example_data, {
3'image/encoded': tf.FixedLenFeature((), tf.string, ''),
4'image/class/label': tf.FixedLenFeature([], tf.int64, 1),
5})
6image_bytes = tf.reshape(parsed['image/encoded'], shape=[])
7label = tf.cast(
8tf.reshape(parsed['image/class/label'], shape=[]), dtype=tf.int32) - 1
9
10# end up with pixel values that are in the -1, 1 range
11image = tf.image.decode_jpeg(image_bytes, channels=NUM_CHANNELS)
12image = tf.image.convert_image_dtype(image, dtype=tf.float32) # 0-1
13image = tf.expand_dims(image, 0) # resize_bilinear needs batches
14
15image = tf.image.resize_bilinear(
16image, [HEIGHT + 10, WIDTH + 10], align_corners=False)
17image = tf.squeeze(image) # remove batch dimension
18image = tf.random_crop(image, [HEIGHT, WIDTH, NUM_CHANNELS])
19image = tf.image.random_flip_left_right(image)
20image = tf.image.random_brightness(image, max_delta=63.0 / 255.0)
21image = tf.image.random_contrast(image, lower=0.2, upper=1.8)
22
23
24#pixel values are in range [0,1], convert to [-1,1]
25image = tf.subtract(image, 0.5)
26image = tf.multiply(image, 2.0)
27return image, label
這里有兩個重要的注意事項。第一是使用 parse_single_example,因為我們是從 map() 調用此函數,所以會針對單個 TF 記錄調用。我們從記錄中提取相關信息(經過編碼的圖像和標簽),然后將其用于構建必要的張量。第二個注意事項是,這些數據必須為數值。比如,我無法傳回標簽字符串,因為 TPU 只能處理數值型數據。我們需要在預處理管道中計算標簽索引,標簽此時只會是整數。
4.提供輸入函數
訓練模型之后,您需要部署此模型,并通過 TF Serving 提供。以下代碼與您使用任何其他估算器時要用到的代碼相同:
1def serving_input_fn():
2# Note: only handles one image at a time
3feature_placeholders = {'image_bytes':
4tf.placeholder(tf.string, shape=())}
5image, _ = read_and_preprocess(
6tf.squeeze(feature_placeholders['image_bytes']))
7features = {
8'image': tf.expand_dims(image, 0)
9}
10return tf.estimator.export.ServingInputReceiver(features, feature_placeholders)
TPU 已針對批次推理進行優化;如果您的用例需要在線預測,目前最好是通過 CPU 或 GPU 提供(根據批次大小和模型復雜程度而定)。編寫輸入函數時,我假定自己只傳送一張圖像,所以實際上是指通過 CPU/GPU 提供。
5.模型函數
模型函數需要創建并返回 TPUEstimatorSpec。實現方式如下所示:
1def image_classifier(features, labels, mode, params):
2image = features
3if isinstance(features, dict):
4image = features['image']
5
6ylogits, nclasses = cnn_model(image, mode, params)
7
8probabilities = tf.nn.softmax(ylogits)
9class_int = tf.cast(tf.argmax(probabilities, 1), tf.int32)
10class_str = tf.gather(LIST_OF_LABELS, class_int)
11
12if mode == tf.estimator.ModeKeys.TRAIN or mode == tf.estimator.ModeKeys.EVAL:
13loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(
14logits=ylogits, labels=tf.one_hot(labels, nclasses)))
15
16def metric_fn(class_int, labels):
17return {'accuracy': tf.metrics.accuracy(class_int, labels)}
18evalmetrics = (metric_fn, [class_int, labels])
19
20if mode == tf.estimator.ModeKeys.TRAIN:
21# this is needed for batch normalization, but has no effect otherwise
22update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
23optimizer = tf.train.AdamOptimizer(learning_rate=params['learning_rate'])
24if params['use_tpu']:
25optimizer = tf.contrib.tpu.CrossShardOptimizer(optimizer) # TPU change 1
26with tf.control_dependencies(update_ops):
27train_op = optimizer.minimize(loss, tf.train.get_global_step())
28else:
29train_op = None
30else:
31loss = None
32train_op = None
33evalmetrics = None
34
35return tf.contrib.tpu.TPUEstimatorSpec( # TPU change 2
36mode=mode,
37predictions={"probabilities": probabilities,
38"classid": class_int, "class": class_str},
39loss=loss,
40train_op=train_op,
41eval_metrics=evalmetrics,
42export_outputs={'classes': tf.estimator.export.PredictOutput(
43{"probabilities": probabilities, "classid": class_int,
44"class": class_str})}
45)
所傳入的特征可能會是圖像(我的訓練和評估輸入函數)或字典(我的提供輸入函數)。我進行檢查并從特征中檢索圖像。
然后,我在圖像上調用實際模型數學函數。這應該是使用 tf.layers 的常見 TensorFlow 代碼。請瀏覽完整的源代碼以了解其形式。
由于這是分類問題,所以我使用 softmax 并根據各個類別的 logit 計算整數標簽和字符串標簽,之后使用了 argmax 和 gather。我還計算了交叉熵損失,這與任何其他估算器類似。
其中一個區別在于,一般的估算器需要將評估指標用作字典,而 TPUEstimator 需要能夠在控制 CPU 或 TPU 上調用的函數。因此,指定 eval 指標的方式稍有不同。
如果您使用 TPU,則所用的優化器必須包裝在 CrossShardOptimizer 中。這樣可以在不同核心中分配優化任務。
訓練操作就是將此交叉碎片優化損失最小化。請使用 optimizer.minimize(),而非 layers.optimize_loss()。
將上述所有操作整合在一起后,系統會返回 TPU 估算器規范。
6.訓練和評估循環
您可能很熟悉估算器的 train_and_evaluate 循環。可惜此循環(尚)無法與 TPU 有效配合使用。幸運的是,創建您自己的循環并不太難,這讓您可以更好地控制檢查頻率和內容(回想這樣的情景,您想盡可能減少過于頻繁的檢查導致的環境切換和 I/O 開銷)。
1def train_and_evaluate(output_dir, hparams):
2STEPS_PER_EVAL = 1000
3max_steps = hparams['train_steps']
4eval_batch_size = min(1024, hparams['num_eval_images'])
5eval_batch_size = eval_batch_size - eval_batch_size % 8 # divisible by num_cores
6tf.logging.info('train_batch_size=%d eval_batch_size=%d max_steps=%d',
7hparams['train_batch_size'],
8eval_batch_size,
9max_steps)
10
11# TPU change 3
12if hparams['use_tpu']:
13tpu_cluster_resolver = tf.contrib.cluster_resolver.TPUClusterResolver(
14hparams['tpu'],
15zone=hparams['tpu_zone'],
16project=hparams['project'])
17config = tf.contrib.tpu.RunConfig(
18cluster=tpu_cluster_resolver,
19model_dir=output_dir,
20save_checkpoints_steps=STEPS_PER_EVAL,
21tpu_config=tf.contrib.tpu.TPUConfig(
22iterations_per_loop=STEPS_PER_EVAL,
23per_host_input_for_training=True))
24else:
25config = tf.contrib.tpu.RunConfig()
26
27estimator = tf.contrib.tpu.TPUEstimator( # TPU change 4
28model_fn=image_classifier,
29config=config,
30params=hparams,
31model_dir=output_dir,
32train_batch_size=hparams['train_batch_size'],
33eval_batch_size=eval_batch_size,
34use_tpu=hparams['use_tpu']
35)
首先,提取一些命令行參數,并用其指定最大步數以及訓練和評估的批次大小。
接下來是尋找 TPU。如果您已在 Google 計算引擎上自行創建了 Cloud TPU,可能已為其命名。假設此名稱(“tpu”)作為命令行參數傳入。如果您使用 Cloud ML Engine,系統會自動推斷 TPU 名稱和區域等內容。請務必僅在已設置 use_tpu 標記的情況下執行此操作。如果用戶是在 CPU 或 GPU 上運行程序,則只需創建空白 RunConfig。
接下來,使用模型函數、配置、參數和批次大小創建 TPUEstimator。創建估算器后,我們便可進入真實訓練和評估循環:
1# load last checkpoint and start from there
2current_step = load_global_step_from_checkpoint_dir(output_dir)
3steps_per_epoch = hparams['num_train_images'] // hparams['train_batch_size']
4tf.logging.info('Training for %d steps (%.2f epochs in total). Current'
5' step %d.',
6max_steps,
7max_steps / steps_per_epoch,
8current_step)
9
10start_timestamp = time.time() # This time will include compilation time
11
12while current_step < hparams['train_steps']:
13# Train for up to steps_per_eval number of steps.
14# At the end of training, a checkpoint will be written to --model_dir.
15next_checkpoint = min(current_step + STEPS_PER_EVAL, max_steps)
16estimator.train(input_fn=train_input_fn, max_steps=next_checkpoint)
17current_step = next_checkpoint
18tf.logging.info('Finished training up to step %d. Elapsed seconds %d.',
19next_checkpoint, int(time.time() - start_timestamp))
20
21# Evaluate the model on the most recent model in --model_dir.
22# Since evaluation happens in batches of --eval_batch_size, some images
23# may be excluded modulo the batch size. As long as the batch size is
24# consistent, the evaluated images are also consistent.
25tf.logging.info('Starting to evaluate at step %d', next_checkpoint)
26eval_results = estimator.evaluate(
27input_fn=eval_input_fn,
28steps=hparams['num_eval_images'] // eval_batch_size)
29tf.logging.info('Eval results at step %d: %s', next_checkpoint, eval_results)
30
31elapsed_time = int(time.time() - start_timestamp)
32tf.logging.info('Finished training up to step %d. Elapsed seconds %d.',
33max_steps, elapsed_time)
TensorFlow 估算器的運作方式是從先前已有的檢查點執行暖啟動。我們可以加載輸出目錄中提供的檢查點,以進行復制。然后,我們會一次性逐步完成訓練數據 train_batch_size 步驟,直至達到所指定的最大步數。
在本文的例子中,我對完整評估數據集中的每個檢查點都進行了評估,但顯然,您可以減少此訓練的計算量。
7.導出模型以供使用
最后,在完成訓練后,我導出已保存的模型。您可以使用 TF Serving 或 Cloud ML Engine 來部署已保存的模型,以進行預測。
1# export similar to Cloud ML Engine / TF Serving convention
2tf.logging.info('Starting to export model.')
3estimator.export_savedmodel(
4export_dir_base=os.path.join(output_dir, 'export/exporter'),
5serving_input_receiver_fn=serving_input_fn)
此時,我們便有了一個可以在 Cloud TPU 上訓練的自定義估算器模型。采用這種方式編寫模型(例如使用 use_tpu 標記并提供轉置為可選項),同樣的代碼也支持各種不同的硬件,包括 CPU 和 GPU 在內。因此,我們的估算器模型實際可用于全部的三類硬件。
后續步驟:
-
從 GitHub 下載本文隨附的代碼,然后進行試用
-
運行代碼實驗室,了解如何在 TPU 上運行 ResNet訓練您自己的數據(無需編寫任何代碼)
注:代碼鏈接
https://github.com/GoogleCloudPlatform/training-data-analyst/tree/master/courses/machine_learning/deepdive/08_image/flowersmodeltpu
在 TPU 上運行 ResNet 鏈接
https://codelabs.developers.google.com/codelabs/tpu-resnet/#0
在 Coursera 上參加使用 TensorFlow 進行機器學習專業課程;此課程會逐步介紹 TensorFlow 概念,以及如何在 Google Cloud 上大規模地訓練、調整和部署 ML 模型。
-
存儲區域
+關注
關注
0文章
9瀏覽量
7155 -
TPU
+關注
關注
0文章
141瀏覽量
20730 -
Cloud
+關注
關注
0文章
68瀏覽量
5356
原文標題:如何為 Cloud TPU 編寫自定義估算器模型
文章出處:【微信號:tensorflowers,微信公眾號:Tensorflowers】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論