Frequently used code for tensorflow related code snippets
General steps
Create Tensors (variables) that are not yet executed/evaluated.
Write operations between those Tensors.
Initialize your Tensors.
Create a Session.
Run the Session. This will run the operations you'd written above.
One-liners
tf.sigmoid
tf.nn.sigmoid_cross_entropy_with_logits
tf.ones
tf.zeros_initializer
tf.contrib.layers.xavier_initializer(seed = 1)
tf.nn.relu
tf.add
tf.matmul
tf.transpose
Recipes
Installing
import tensorflow as tf
Set different types of values
c = tf.constant(12,name="c")x = tf.get_variable(c**2,name='x')# variable, could have shape=[x,y], and initializer=y = tf.placeholder(tf.int64, name = 'x') # placeholder is a value you can specify at the moment of the session execution with the parameter feed_dict = {x: 3}
# Placeholder could have shape as shape=[n_x,None]
Initialize variables
tf.reset_default_graph()# Above code seems to be a good practiceinit = tf.global_variable_initializer()with tf.Session()as sess: sess.run(init)# if not used with 'with' it is necessary to do sess.close()
Clean memory from device
# pip install numba from numba import cuda device = cuda.get_current_device()device.reset()
Computing cost for sigmoid
cost = tf.nn.sigmoid_cross_entropy_with_logits(logits=z,labels=y)# z = \hat{y} and y = true value of label
One-hot encoding
one_hot_matrix = tf.one_hot(labels,C, axis=0)# C is number classes, labels is vector of labels
Use: Given an input A it performs a pooling layer with a windown of size (f,f), ie usually it takes one example and one channel at a time.
flatten
tf.contrib.layers.flatten(P)
Use: Given an input tensor P it takes each example from batch and generate an 1D array as output For example, receiving a tensor of shape [batch_size, width, height, channels] it would return a tensor of shape = [batch_size, width x height x channels]
fully_connected
tf.contrib.layers.fully_connected(F, num_outputs)
Use: Given an input tensor F (flattened) it generates an initialized layer of weights in the graph, so they don't need to be initialized. This layers needs to have an additional argument activation_fn=None to not apply softmax
"Logits" are the result of multiplying the weights and adding the biases. Logits are passed through an activation function (such as a relu), and the result is called the "activation."
Example of functional code for a tf project is at docs/career/convnet_course
Images
Read functions
from tensorflow.python.keras.preprocessing.image import load_img,img_to_arrayimgs = [load_img(img_path, target_size=(img_height, img_width))for img_path in img_paths] img_array = np.array([img_to_array(img) for img in imgs])
ResNet50 preprocessing
from tensorflow.python.keras.applications.resnet50 import preprocess_inputoutput =preprocess_input(img_array)
Utils
from keras.applications.resnet50 import decode_predictionsdecode_predictions(preds, top=3)# model.predict output
Display on notebook
from IPython.display import Image,displaydisplay(Image(img_path))
Transfer learning example
from tensorflow.python.keras.applications import ResNet50from tensorflow.python.keras.models import Sequentialfrom tensorflow.python.keras.layers import Dense, Flatten, GlobalAveragePooling2Dnum_classes =2resnet_weights_path ='../input/resnet50/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5'my_new_model =Sequential()my_new_model.add(ResNet50(include_top=False, pooling='avg', weights=resnet_weights_path))my_new_model.add(Dense(num_classes, activation='softmax'))# Say not to train first layer (ResNet) model. It is already trainedmy_new_model.layers[0].trainable =False
Feeding data into models with ImageGenerator
from tensorflow.python.keras.applications.resnet50 import preprocess_inputfrom tensorflow.python.keras.preprocessing.image import ImageDataGeneratorimage_size =224data_generator =ImageDataGenerator(preprocessing_function=preprocess_input)# having 72 images for training and 20 for validationtrain_generator = data_generator.flow_from_directory('../input/urban-and-rural-photos/rural_and_urban_photos/train', target_size=(image_size, image_size), batch_size=24, class_mode='categorical')validation_generator = data_generator.flow_from_directory('../input/urban-and-rural-photos/rural_and_urban_photos/val', target_size=(image_size, image_size), class_mode='categorical')my_new_model.fit_generator( train_generator, steps_per_epoch=3, validation_data=validation_generator, validation_steps=1)
TPU usage
Works for colab
%tensorflow_version 2.ximport tensorflow as tftry: tpu = tf.distribute.cluster_resolver.TPUClusterResolver()# TPU detectionprint('Running on TPU ', tpu.cluster_spec().as_dict()['worker'])exceptValueError: raise BaseException('ERROR: Not connected to a TPU runtime; please see the previous cell in this notebook for instructions!')
tf.config.experimental_connect_to_cluster(tpu)tf.tpu.experimental.initialize_tpu_system(tpu)tpu_strategy = tf.distribute.experimental.TPUStrategy(tpu)
When training:
with tpu_strategy.scope(): full_model =create_model(max_words, embedding_dim, max_len, embedding_matrix) history = full_model.fit(X_train, Y_train, epochs =14)
import tensorflow as tfimport tensorflow_data_validation as tfdvimport pandas as pdfrom tensorflow_metadata.proto.v0 import schema_pb2print('TFDV Version: {}'.format(tfdv.__version__))print('Tensorflow Version: {}'.format(tf.__version__))
Generate statistics
# Generate training dataset statistics# the line below can be used for selecting which columns we want to calculate metrics on# stats_options = tfdv.StatsOptions(feature_whitelist=approved_cols)train_stats = tfdv.generate_statistics_from_dataframe(train_df, stats_options)
Visualize statistics
# Visualize training dataset statisticstfdv.visualize_statistics(train_stats)
Infer data schema
# Infer schema from the computed statistics.schema = tfdv.infer_schema(statistics=train_stats)# Display the inferred schematfdv.display_schema(schema)
Comparing stats from training/test
# Generate evaluation dataset statisticseval_stats = tfdv.generate_statistics_from_dataframe(eval_df)# Compare training with evaluationtfdv.visualize_statistics( lhs_statistics=eval_stats, rhs_statistics=train_stats, lhs_name='EVAL_DATASET', rhs_name='TRAIN_DATASET')
Calculate and display anomalies
# Check evaluation data for errors by validating the evaluation dataset statistics using the reference schemaanomalies = tfdv.validate_statistics(statistics=eval_stats, schema=schema)# Visualize anomaliestfdv.display_anomalies(anomalies)
Fix anomalies in the schema
# Relax the minimum fraction of values that must come from the domain for the feature `native-country`country_feature = tfdv.get_feature(schema, 'native-country')country_feature.distribution_constraints.min_domain_mass =0.9# Relax the minimum fraction of values that must come from the domain for the feature `occupation`occupation_feature = tfdv.get_feature(schema, 'occupation')occupation_feature.distribution_constraints.min_domain_mass =0.9
More flexible extension of schema
# Add new value to the domain of the feature `race`race_domain = tfdv.get_domain(schema, 'race')race_domain.value.append('Asian')# or complete substitution of a feature domain to the domain of anothertfdv.set_domain(schema, feature, to_domain_name)
Manual set of range for int values
# Restrict the range of the `age` featuretfdv.set_domain(schema, 'age', schema_pb2.IntDomain(name='age', min=17, max=90))# Display the modified schema. Notice the `Domain` column of `age`.tfdv.display_schema(schema)
Data environments
schema.default_environment.append('TRAINING')schema.default_environment.append('SERVING')# If we want to remove a feature from a given environment (sample)tfdv.get_feature(schema, 'readmitted').not_in_environment.append('SERVING')
Data drift and skew
diabetes_med = tfdv.get_feature(schema, 'diabetesMed')# domain knowledge helps to determine this thresholddiabetes_med.skew_comparator.infinity_norm.threshold =0.03skew_drift_anomalies = tfdv.validate_statistics(train_stats, schema, previous_statistics=eval_stats, serving_statistics=serving_stats)tfdv.display_anomalies(skew_drift_anomalies)
Freeze schema
schema_file = os.path.join(OUTPUT_DIR, 'schema.pbtxt')# write_schema_text function expect the defined schema and output path as parameterstfdv.write_schema_text(schema, schema_file)
Data slicing
from tensorflow_data_validation.utils import slicing_util# features={'sex': [b'Male']} # or this for example (with b'')slice_fn = slicing_util.get_feature_value_slicer(features={'sex': None})# Declare stats optionsslice_stats_options = tfdv.StatsOptions(schema=schema, slice_functions=[slice_fn], infer_type_from_schema=True)# Convert dataframe to CSV since `slice_functions` works only with `tfdv.generate_statistics_from_csv`CSV_PATH ='slice_sample.csv'train_df.to_csv(CSV_PATH)# Calculate statistics for the sliced datasetsliced_stats = tfdv.generate_statistics_from_csv(CSV_PATH, stats_options=slice_stats_options)from tensorflow_metadata.proto.v0.statistics_pb2 import DatasetFeatureStatisticsList# An important caveat is visualize_statistics() accepts a DatasetFeatureStatisticsList type instead of DatasetFeatureStatistics. Thus, at least for this version of TFDV, you will need to convert it to the correct type.
# Convert `Male` statistics (index=1) to the correct type and get the dataset namemale_stats_list =DatasetFeatureStatisticsList()male_stats_list.datasets.extend([sliced_stats.datasets[1]])male_stats_name = sliced_stats.datasets[1].name# Convert `Female` statistics (index=2) to the correct type and get the dataset namefemale_stats_list =DatasetFeatureStatisticsList()female_stats_list.datasets.extend([sliced_stats.datasets[2]])female_stats_name = sliced_stats.datasets[2].name# Visualize the two slices side by sidetfdv.visualize_statistics( lhs_statistics=male_stats_list, rhs_statistics=female_stats_list, lhs_name=male_stats_name, rhs_name=female_stats_name)
Generate tensorflow data directory
import osimport shutilimport csvfrom sklearn.model_selection import train_test_splitdef text_dataframe_to_tf_dir(df: pd.DataFrame, target_dir: str, label_col_ix: int = -1, value_col_ix: int = 0, train_split: float = 0.7, validation_split: float = 0.15, test_split: float = 0.15) -> None:
"""Generate a training data for tensorflow directory format The training data generated is meant to be loaded from tensorflow as follows: Args: (pd.DataFrame) df: Dataframe containing the text data for the model in the format of 'value\tlabel'. (str) target_dir: Target directory to place the formatted data. (int) label_col_ix: Label column to use for the directory generation. (int) value_col_ix: Value column to be written in the text files. (float) train_split: Percentage of data to be placed in the train subdir. (float) validation_split: Percentage of data to be placed in the valid subdir, only used if test_split and validation_split are not None.
(float) test_split: Percentage of data to be placed in the test subdir. Returns: (None) Raises: IndexError: Column index for label column is out of columns boundaries. """try: label_col = df.columns[label_col_ix]exceptIndexError:raiseIndexError(f'Column index "{label_col_ix}" is out of boundaries.')try: value_col = df.columns[value_col_ix]exceptIndexError:raiseIndexError(f'Column index "{label_col_ix}" is out of boundaries.')if os.path.exists(target_dir): shutil.rmtree(target_dir) os.makedirs(target_dir)if train_split isnotNone:if test_split isnotNone:if validation_split isnotNone: val_test_split = validation_split + test_split train_set, test_set = train_test_split(df, test_size=val_test_split, random_state=42, stratify=df[label_col])
validation_split = validation_split / val_test_split test_set, validation_set = train_test_split(test_set, test_size=validation_split, random_state=42, stratify=test_set[label_col])
splits = [('train', train_set), ('test', test_set), ('validation', validation_set)]else: train_set, test_set = train_test_split(df, test_size=test_split, random_state=42, stratify=df[label_col])
splits = [('train', train_set), ('test', test_set)]else: splits = [('train', df)]else:returnfor split_name, split_data in splits:print(f'Processing split "{split_name}"') split_dir = os.path.join(target_dir, split_name) os.makedirs(split_dir)for name, label_data in split_data.groupby(label_col):print(f'Processing label "{name}" for split {split_name}') label_dir = os.path.join(split_dir, str(name)) os.makedirs(label_dir)for ix, obs inenumerate(label_data[value_col].tolist()):withopen(os.path.join(label_dir, f'{name}_{ix}.txt'), 'w')as fl: fl.write(obs)
The snippet above is meant to be loaded with keras as follows:
import tensorflow as tffrom tensorflow_transform.tf_metadata import dataset_metadatafrom tensorflow_transform.tf_metadata import schema_utils# define the schema as a DatasetMetadata objectraw_data_metadata = dataset_metadata.DatasetMetadata(# use convenience function to build a Schema protobuf schema_utils.schema_from_feature_spec({# define a dictionary mapping the keys to its feature spec type'y': tf.io.FixedLenFeature([], tf.float32),'x': tf.io.FixedLenFeature([], tf.float32),'s': tf.io.FixedLenFeature([], tf.string), }))
Sample preprocessing function
defpreprocessing_fn(inputs):"""Preprocess input columns into transformed columns."""# extract the columns and assign to local variables x = inputs['x'] y = inputs['y'] s = inputs['s']# data transformations using tft functions x_centered = x - tft.mean(x) y_normalized = tft.scale_to_0_1(y) s_integerized = tft.compute_and_apply_vocabulary(s) x_centered_times_y_normalized = (x_centered * y_normalized)# return the transformed datareturn{'x_centered': x_centered,'y_normalized': y_normalized,'s_integerized': s_integerized,'x_centered_times_y_normalized': x_centered_times_y_normalized,}
Generate a constant graph with the required transformations
# Ignore the warningstf.get_logger().setLevel('ERROR')# a temporary directory is needed when analyzing the datawith tft_beam.Context(temp_dir=tempfile.mkdtemp()):# define the pipeline using Apache Beam syntax transformed_dataset, transform_fn = (# analyze and transform the dataset using the preprocessing function (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset( preprocessing_fn) )# unpack the transformed datasettransformed_data, transformed_metadata = transformed_dataset
Run tf pipeline
# Initialize the InteractiveContext with a local sqlite file.# If you leave `_pipeline_root` blank, then the db will be created in a temporary directory.from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContextcontext =InteractiveContext(pipeline_root=_pipeline_root)
read csv file
_data_root can be csv, tf.Record and BigQuery.
from tfx.components import CsvExampleGen# Instantiate ExampleGen with the input CSV datasetexample_gen =CsvExampleGen(input_base=_data_root)context.run(example_gen)
Inspect generated artifact
It will keep each run associated with an ID for that execution for debugging
# get the artifact objectartifact = example_gen.outputs['examples'].get()[0]# print split names and uriprint(f'split names: {artifact.split_names}')print(f'artifact uri: {artifact.uri}')
Read and print tf.Record files
train_uri = os.path.join(artifact.uri, 'train')# Get the list of files in this directory (all compressed TFRecord files)tfrecord_filenames = [os.path.join(train_uri, name)for name in os.listdir(train_uri)]# Create a `TFRecordDataset` to read these filesdataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
from google.protobuf.json_format import MessageToDictdefget_records(dataset,num_records):'''Extracts records from the given dataset. Args: dataset (TFRecordDataset): dataset saved by ExampleGen num_records (int): number of records to preview '''# initialize an empty list records = []# Use the `take()` method to specify how many records to getfor tfrecord in dataset.take(num_records):# Get the numpy property of the tensor serialized_example = tfrecord.numpy()# Initialize a `tf.train.Example()` to read the serialized data example = tf.train.Example()# Read the example data (output is a protocol buffer message) example.ParseFromString(serialized_example)# convert the protocol buffer message to a Python dictionary example_dict = (MessageToDict(example))# append to the records list records.append(example_dict)return records
Sample usage
import pprintpp = pprint.PrettyPrinter()# Get 3 records from the datasetsample_records =get_records(dataset, 3)# Print the outputpp.pprint(sample_records)
Generate statistics for a given dataset
from tfx.components import StatisticsGen# Instantiate StatisticsGen with the ExampleGen ingested datasetstatistics_gen =StatisticsGen( examples=example_gen.outputs['examples'])# example_gen from above# Execute the componentcontext.run(statistics_gen)
from tfx.components import SchemaGen# Instantiate SchemaGen with the StatisticsGen ingested datasetschema_gen =SchemaGen( statistics=statistics_gen.outputs['statistics'], )# Run the componentcontext.run(schema_gen)
Show schema
context.show(schema_gen.outputs['schema'])
Detect anomalies for a given dataset
# Instantiate ExampleValidator with the StatisticsGen and SchemaGen ingested data
example_validator = ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=schema_gen.outputs['schema'])
# Run the component.
context.run(example_validator)
Transformations need to be passed as modules to tfx a common pattern is to have a constant file as follows
# Features with string data types that will be converted to indices
CATEGORICAL_FEATURE_KEYS = [
'education', 'marital-status', 'occupation', 'race', 'relationship', 'workclass', 'sex', 'native-country'
]
# Numerical features that are marked as continuous
NUMERIC_FEATURE_KEYS = ['fnlwgt', 'education-num', 'capital-gain', 'capital-loss', 'hours-per-week']
# Feature that can be grouped into buckets
BUCKET_FEATURE_KEYS = ['age']
# Number of buckets used by tf.transform for encoding each bucket feature.
FEATURE_BUCKET_COUNT = {'age': 4}
# Feature that the model will predict
LABEL_KEY = 'label'
# Utility function for renaming the feature
def transformed_name(key):
return key + '_xf'
Then, having the following sample processing function in a file
import tensorflow as tf
import tensorflow_transform as tft
import census_constants # this is the constants file from above
# Unpack the contents of the constants module
_NUMERIC_FEATURE_KEYS = census_constants.NUMERIC_FEATURE_KEYS
_CATEGORICAL_FEATURE_KEYS = census_constants.CATEGORICAL_FEATURE_KEYS
_BUCKET_FEATURE_KEYS = census_constants.BUCKET_FEATURE_KEYS
_FEATURE_BUCKET_COUNT = census_constants.FEATURE_BUCKET_COUNT
_LABEL_KEY = census_constants.LABEL_KEY
_transformed_name = census_constants.transformed_name
# Define the transformations
def preprocessing_fn(inputs):
"""tf.transform's callback function for preprocessing inputs.
Args:
inputs: map from feature keys to raw not-yet-transformed features.
Returns:
Map from string feature key to transformed feature operations.
"""
outputs = {}
# Scale these features to the range [0,1]
for key in _NUMERIC_FEATURE_KEYS:
outputs[_transformed_name(key)] = tft.scale_to_0_1(
inputs[key])
# Bucketize these features
for key in _BUCKET_FEATURE_KEYS:
outputs[_transformed_name(key)] = tft.bucketize(
inputs[key], _FEATURE_BUCKET_COUNT[key],
always_return_num_quantiles=False)
# Convert strings to indices in a vocabulary
for key in _CATEGORICAL_FEATURE_KEYS:
outputs[_transformed_name(key)] = tft.compute_and_apply_vocabulary(inputs[key])
# Convert the label strings to an index
outputs[_transformed_name(_LABEL_KEY)] = tft.compute_and_apply_vocabulary(inputs[_LABEL_KEY])
return outputs
we will pass it to the transform function as follows:
from tfx.components import Transform
# Ignore TF warning messages
tf.get_logger().setLevel('ERROR')
# Instantiate the Transform component
transform = Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
module_file=os.path.abspath(_census_transform_module_file))
# Run the component
context.run(transform)
This execution will produce (in .component.outputs):
transform_graph is the graph that can perform the preprocessing operations. This graph will be included during training and serving to ensure consistent transformations of incoming data.
transformed_examples points to the preprocessed training and evaluation data.
updated_analyzer_cache are stored calculations from previous runs.
transform_graph for example would have (in transform.outputs['transform_graph'].get()[0].uri):
The metadata subdirectory contains the schema of the original data.
The transformed_metadata subdirectory contains the schema of the preprocessed data.
The transform_fn subdirectory contains the actual preprocessing graph.
A sample of transformed data can be retrieved with
train_uri = os.path.join(transform.outputs['transformed_examples'].get()[0].uri, 'train')
# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
for name in os.listdir(train_uri)]
# Create a `TFRecordDataset` to read these files
transformed_dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
# Get 3 records from the dataset
sample_records_xf = get_records(transformed_dataset, 3)
# Print the output
pp.pprint(sample_records_xf)