Skip to content

Commit 30def1a

Browse files
tf-transform-teamzoyahav
authored andcommitted
Project import generated by Copybara.
PiperOrigin-RevId: 186310133
1 parent cdb30c5 commit 30def1a

File tree

13 files changed

+370
-287
lines changed

13 files changed

+370
-287
lines changed

CONTRIBUTING.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ Follow either of the two links above to access the appropriate CLA and instructi
1818
### Contributing code
1919

2020
If you have improvements to TensorFlow Transform, send us your pull requests!
21-
For those just getting started, Github has a [howto](https://help.github.com/articles/using-pull-requests/).
21+
For those just getting started, GitHub has a [howto](https://help.github.com/articles/using-pull-requests/).
2222

2323
If you want to contribute but you're not sure where to start, take a look at the
2424
[issues with the "contributions welcome" label](https://github.com/tensorflow/transform/labels/contributions%20welcome).

README.md

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
# tf.Transform
1+
# tf.Transform [![PyPI](https://img.shields.io/pypi/pyversions/tensorflow-transform.svg?style=plastic)](https://github.com/tensorflow/transform)
22

3-
**tf.Transform** is a library for doing data preprocessing with TensorFlow. It
4-
allows users to combine various data processing frameworks (currently Apache
5-
Beam is supported but tf.Transform can be extended to support other frameworks),
3+
**tf.Transform** is a library for doing data preprocessing with
4+
[TensorFlow](https://www.tensorflow.org). It allows users to combine various
5+
data processing frameworks (currently [Apache Beam](https://beam.apache.org/) is
6+
supported but tf.Transform can be extended to support other frameworks),
67
with TensorFlow, to transform data. Because tf.Transform is built on TensorFlow,
78
it allows users to export a graph which re-creates the transformations they did
89
to their data as a TensorFlow graph. This is important as the user can then
@@ -39,13 +40,9 @@ tf.Transform does though have a dependency on the GCP distribution of Apache
3940
Beam. Apache Beam is the framework used to run distributed pipelines. Apache
4041
Beam is able to run pipelines in multiple ways, depending on the "runner" used,
4142
and the "runner" is usually provided by a distribution of Apache
42-
Beam. With the GCP distribution of Apache Beam, one can run beam pipelines
43-
locally, or on Google Cloud Dataflow.
44-
45-
Note: If you clone tf.Transform's implementation and samples from GitHub's
46-
`master` branch (as opposed to using the released implementation and samples
47-
from PyPI) they will likely only work with TensorFlow's nightly
48-
[build](https://github.com/tensorflow/tensorflow).
43+
Beam. With the GCP distribution of Apache Beam, one can run Apache Beam
44+
pipelines locally, or on
45+
[Google Cloud Dataflow](https://cloud.google.com/dataflow/).
4946

5047
### Compatible Versions
5148

@@ -56,7 +53,7 @@ releasing a new version.
5653

5754
|tensorflow-transform |tensorflow |apache-beam[gcp]|
5855
|--------------------------------------------------------------------------------|--------------|----------------|
59-
|[GitHub master](https://github.com/tensorflow/transform/blob/master/RELEASE.md) |nightly (1.x) |2.2.0 |
56+
|[GitHub master](https://github.com/tensorflow/transform/blob/master/RELEASE.md) |nightly (1.x) |2.3.0 |
6057
|[0.4.0](https://github.com/tensorflow/transform/blob/v0.4.0/RELEASE.md) |1.4 |2.2.0 |
6158
|[0.3.1](https://github.com/tensorflow/transform/blob/v0.3.1/RELEASE.md) |1.3 |2.1.1 |
6259
|[0.3.0](https://github.com/tensorflow/transform/blob/v0.3.0/RELEASE.md) |1.3 |2.1.1 |

RELEASE.md

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,42 @@
11
# Current version (not yet released; still in development)
22

33
## Major Features and Improvements
4-
* Batching of input instances is now done automatically and dynamically.
5-
* Added analyzers to compute covarance matrices (`tft.covariance`) and
6-
principal components for PCA (`tft.pca`).
7-
* CombinerSpec and combine_analyzer now accept multiple inputs/outputs.
4+
* Batching of input instances is now done automatically and dynamically.
5+
* Added analyzers to compute covarance matrices (`tft.covariance`) and
6+
principal components for PCA (`tft.pca`).
7+
* CombinerSpec and combine_analyzer now accept multiple inputs/outputs.
88

99
## Bug Fixes and Other Changes
10-
11-
* Fixes a bug where TransformDataset would not return correct output if the
12-
output DatasetMetadata contained deferred values (such as vocabularies).
13-
* Added checks that the prepreprocessing function's outputs all have the same
14-
size in the batch dimension.
15-
* Added `tft.apply_buckets` which takes an input tensor and a list of bucket
16-
boundaries, and returns bucketized data.
17-
* `tft.bucketize` and `tft.apply_buckets` now set metadata for the output
18-
tensor, which means the resulting tf.Metadata for the output of these
19-
functions will contain min and max values based on the number of buckets,
20-
and also be set to categorical.
21-
* Testing helper function assertAnalyzeAndTransformResults can now also test
22-
the content of vocabulary files and other assets.
23-
* Reduces the number of beam stages needed for certain analyzers, which can be
24-
a performance bottleneck when transforming many features.
25-
* Performance improvements in `tft.uniques`.
26-
* Fix a bug in `tft.bucketize` where the bucket boundary could be same as a
27-
min/max value, and was getting dropped.
28-
* Allows scaling individual components of a tensor independently with
29-
`tft.scale_by_min_max`, `tft.scale_to_0_1`, and `tft.scale_to_z_score`.
30-
* Fix a bug where `apply_saved_transform` could only be applied in the global
31-
name scope.
10+
* Depends on `apache-beam[gcp]>=2.2,<3`.
11+
* Fixes a bug where TransformDataset would not return correct output if the
12+
output DatasetMetadata contained deferred values (such as vocabularies).
13+
* Added checks that the prepreprocessing function's outputs all have the same
14+
size in the batch dimension.
15+
* Added `tft.apply_buckets` which takes an input tensor and a list of bucket
16+
boundaries, and returns bucketized data.
17+
* `tft.bucketize` and `tft.apply_buckets` now set metadata for the output
18+
tensor, which means the resulting tf.Metadata for the output of these
19+
functions will contain min and max values based on the number of buckets,
20+
and also be set to categorical.
21+
* Testing helper function assertAnalyzeAndTransformResults can now also test
22+
the content of vocabulary files and other assets.
23+
* Reduces the number of beam stages needed for certain analyzers, which can be
24+
a performance bottleneck when transforming many features.
25+
* Performance improvements in `tft.uniques`.
26+
* Fix a bug in `tft.bucketize` where the bucket boundary could be same as a
27+
min/max value, and was getting dropped.
28+
* Allows scaling individual components of a tensor independently with
29+
`tft.scale_by_min_max`, `tft.scale_to_0_1`, and `tft.scale_to_z_score`.
30+
* Fix a bug where `apply_saved_transform` could only be applied in the global
31+
name scope.
32+
* Add warning when `frequency_threshold` that are <= 1. This is a no-op and
33+
generally reflects mistaking `frequency_threshold` for a relative frequency
34+
where in fact it is an absolute frequency.
3235

3336
## Breaking changes
3437
* The interfaces of CombinerSpec and combine_analyzer have changed to allow
3538
for multiple inputs/outputs.
39+
* Requires pre-installed TensorFlow >=1.5,<2.
3640

3741
## Deprecations
3842

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
def _make_required_install_packages():
2424
return [
25-
'apache-beam[gcp]>=2.2,<3',
25+
'apache-beam[gcp]>=2.3,<3',
2626

2727
# Protobuf libraries < 3.3 contain some map-related data corruption bugs
2828
# (b/35874111).

tensorflow_transform/analyzers.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -479,8 +479,10 @@ def uniques(x, top_k=None, frequency_threshold=None,
479479
top_k: Limit the generated vocabulary to the first `top_k` elements. If set
480480
to None, the full vocabulary is generated.
481481
frequency_threshold: Limit the generated vocabulary only to elements whose
482-
frequency is >= to the supplied threshold. If set to None, the full
483-
vocabulary is generated.
482+
absolute frequency is >= to the supplied threshold. If set to None, the
483+
full vocabulary is generated. Absolute frequency means the number of
484+
occurences of the element in the dataset, as opposed to the proportion of
485+
instances that contain that element.
484486
vocab_filename: The file name for the vocabulary file. If none, the
485487
"uniques" scope name in the context of this graph will be used as the file
486488
name. If not None, should be unique within a given preprocessing function.
@@ -509,6 +511,10 @@ def uniques(x, top_k=None, frequency_threshold=None,
509511
raise ValueError(
510512
'frequency_threshold must be non-negative, but got: %r' %
511513
frequency_threshold)
514+
elif frequency_threshold <= 1:
515+
tf.logging.warn(
516+
'frequency_threshold %d <= 1 is a no-op, use None instead.',
517+
frequency_threshold)
512518

513519
if isinstance(x, tf.SparseTensor):
514520
x = x.values

tensorflow_transform/beam/analyzer_impls.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import collections
2121
import os
22-
import random
2322

2423

2524
import apache_beam as beam
@@ -157,14 +156,7 @@ def expand(self, pcoll):
157156
# via AsIter. By breaking fusion, we allow sharded files' sizes to be
158157
# automatically computed (when possible), so we end up reading from fewer
159158
# and larger files.
160-
@beam.ptransform_fn
161-
def Reshard(pcoll): # pylint: disable=invalid-name
162-
return (
163-
pcoll
164-
| 'PairWithRandom' >> beam.Map(lambda x: (random.getrandbits(32), x))
165-
| 'GroupByRandom' >> beam.GroupByKey()
166-
| 'ExtractValues' >> beam.FlatMap(lambda x: x[1]))
167-
counts |= 'Reshard' >> Reshard() # pylint: disable=no-value-for-parameter
159+
counts |= 'Reshard' >> beam.transforms.Reshuffle() # pylint: disable=no-value-for-parameter
168160

169161
# Using AsIter instead of AsList below in order to reduce max memory
170162
# usage (due to AsList caching).

tensorflow_transform/beam/impl.py

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -241,11 +241,13 @@ class _GraphState(object):
241241
def __init__(self, saved_model_dir, input_schema, exclude_outputs,
242242
tf_config):
243243
self.saved_model_dir = saved_model_dir
244-
self.session = tf.Session(graph=tf.Graph(), config=tf_config)
245-
with self.session.graph.as_default():
246-
with tf.Session(config=tf_config):
244+
graph = tf.Graph()
245+
self.session = tf.Session(graph=graph, config=tf_config)
246+
with graph.as_default():
247+
with self.session.as_default():
247248
inputs, outputs = saved_transform_io.partially_apply_saved_transform(
248249
saved_model_dir, {})
250+
self.session.run(tf.global_variables_initializer())
249251
self.session.run(tf.tables_initializer())
250252

251253
input_schema_keys = input_schema.column_schemas.keys()
@@ -342,9 +344,9 @@ def process(self, batch, saved_model_dir):
342344
def _assert_tensorflow_version():
343345
# Fail with a clear error in case we are not using a compatible TF version.
344346
major, minor, _ = tf.__version__.split('.')
345-
if int(major) != 1 or int(minor) < 4:
347+
if int(major) != 1 or int(minor) < 5:
346348
raise RuntimeError(
347-
'Tensorflow version >= 1.4, < 2 is required. Found (%s). Please '
349+
'TensorFlow version >= 1.5, < 2 is required. Found (%s). Please '
348350
'install the latest 1.x version from '
349351
'https://github.com/tensorflow/tensorflow. ' % tf.__version__)
350352

@@ -372,6 +374,8 @@ def _write_saved_transform(graph, inputs, outputs, saved_model_dir):
372374
removed_collections.append((collection_name,
373375
graph.get_collection(collection_name)))
374376
graph.clear_collection(collection_name)
377+
# Initialize all variables so they can be saved.
378+
session.run(tf.global_variables_initializer())
375379
saved_transform_io.write_saved_transform_from_session(
376380
session, inputs, outputs, saved_model_dir)
377381
for collection_name, collection in removed_collections:
@@ -478,6 +482,7 @@ def replace_tensors_with_constant_values(saved_model_dir,
478482
input_tensors, output_tensors = (
479483
saved_transform_io.partially_apply_saved_transform(
480484
saved_model_dir, {}, tensor_replacement_map))
485+
session.run(tf.global_variables_initializer())
481486
saved_transform_io.write_saved_transform_from_session(
482487
session, input_tensors, output_tensors, temp_dir)
483488
return temp_dir
@@ -602,6 +607,7 @@ def extract_scalar_constants(tensor_names, saved_model_dir,
602607
tensor_output_map = (
603608
saved_transform_io.fetch_tensor_values(
604609
saved_model_dir, tensor_replacement_map, tensor_names))
610+
session.run(tf.global_variables_initializer())
605611
session.run(tf.tables_initializer())
606612
return session.run(tensor_output_map)
607613

@@ -650,21 +656,43 @@ def expand(self, dataset):
650656
651657
Returns:
652658
A TransformFn containing the deferred transform function.
653-
"""
654659
660+
Raises:
661+
ValueError: If preprocessing_fn has no outputs.
662+
"""
655663
input_values, input_metadata = dataset
656664
input_schema = input_metadata.schema
657665

658666
base_temp_dir = Context.create_base_temp_dir()
659667

660668
graph = tf.Graph()
661669
with graph.as_default():
670+
671+
with tf.name_scope('inputs'):
672+
inputs = input_schema.as_batched_placeholders()
673+
# In order to avoid a bug where import_graph_def fails when the input_map
674+
# and return_elements of an imported graph are the same (b/34288791), we
675+
# avoid using the placeholder of an input column as an output of a graph.
676+
# We do this by applying tf.identity to all inputs of the
677+
# preprocessing_fn. Note this applies at the level of raw tensors.
678+
outputs = self._preprocessing_fn(impl_helper.copy_tensors(inputs))
679+
680+
# At this point we check that the preprocessing_fn has at least one
681+
# output. This is because if we allowed the output of preprocessing_fn to
682+
# be empty, we wouldn't be able to determine how many instances to
683+
# "unbatch" the output into.
684+
if not outputs:
685+
raise ValueError('The preprocessing function returned an empty dict')
686+
687+
if graph.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES):
688+
raise ValueError(
689+
'The preprocessing function contained trainable variables '
690+
'{}'.format(
691+
graph.get_collection_ref(tf.GraphKeys.TRAINABLE_VARIABLES)))
692+
662693
# NOTE: it's important that create_phases is called directly after
663-
# run_preprocessing_fn, because we later mutate the graph's
664-
# TABLE_INITIALIZERS collection which would break the logic in
665-
# create_phases.
666-
inputs, outputs = impl_helper.run_preprocessing_fn(
667-
self._preprocessing_fn, input_schema)
694+
# preprocessing_fn, because we later mutate the graph's TABLE_INITIALIZERS
695+
# collection which would break the logic in create_phases.
668696
phases = impl_helper.create_phases()
669697

670698
# Iterate through levels. tensor_pcoll_mapping is a mapping from tensor

tensorflow_transform/beam/impl_test.py

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import math
2121
import os
2222
import random
23+
import shutil
2324

2425

2526
import apache_beam as beam
@@ -42,8 +43,9 @@
4243
from tensorflow_transform.tf_metadata import dataset_schema as sch
4344
from tensorflow_transform.tf_metadata import metadata_io
4445

46+
from google.protobuf import text_format
4547
import unittest
46-
# pylint: enable=g-import-not-at-top
48+
from tensorflow.core.example import example_pb2
4749

4850

4951
class BeamImplTest(tft_unit.TransformTestCase):
@@ -367,6 +369,91 @@ def preprocessing_fn(inputs):
367369
input_data, input_metadata, preprocessing_fn, expected_data,
368370
expected_metadata)
369371

372+
def testRawFeedDictInput(self):
373+
# Test the ability to feed raw data into AnalyzeDataset and TransformDataset
374+
# by using subclasses of these transforms which create batches of size 1.
375+
def preprocessing_fn(inputs):
376+
sequence_example = inputs['sequence_example']
377+
378+
# Ordinarily this would have shape (batch_size,) since 'sequence_example'
379+
# was defined as a FixedLenFeature with shape (). But since we specified
380+
# desired_batch_size, we can assume that the shape is (1,), and reshape
381+
# to ().
382+
sequence_example = tf.reshape(sequence_example, ())
383+
384+
# Parse the sequence example.
385+
feature_spec = {
386+
'x': tf.FixedLenSequenceFeature(shape=[], dtype=tf.string,
387+
default_value=None)
388+
}
389+
_, sequences = tf.parse_single_sequence_example(
390+
sequence_example, sequence_features=feature_spec)
391+
392+
# Create a batch based on the sequence "x".
393+
return {'x': sequences['x']}
394+
395+
def text_sequence_example_to_binary(text_proto):
396+
proto = text_format.Merge(text_proto, example_pb2.SequenceExample())
397+
return proto.SerializeToString()
398+
399+
sequence_examples = [
400+
"""
401+
feature_lists: {
402+
feature_list: {
403+
key: "x"
404+
value: {
405+
feature: {bytes_list: {value: 'ab'}}
406+
feature: {bytes_list: {value: ''}}
407+
feature: {bytes_list: {value: 'c'}}
408+
feature: {bytes_list: {value: 'd'}}
409+
}
410+
}
411+
}
412+
""",
413+
"""
414+
feature_lists: {
415+
feature_list: {
416+
key: "x"
417+
value: {
418+
feature: {bytes_list: {value: 'ef'}}
419+
feature: {bytes_list: {value: 'g'}}
420+
}
421+
}
422+
}
423+
"""
424+
]
425+
input_data = [
426+
{'sequence_example': text_sequence_example_to_binary(sequence_example)}
427+
for sequence_example in sequence_examples]
428+
input_metadata = dataset_metadata.DatasetMetadata({
429+
'sequence_example': sch.ColumnSchema(tf.string, [],
430+
sch.FixedColumnRepresentation())
431+
})
432+
expected_data = [
433+
{'x': 'ab'},
434+
{'x': ''},
435+
{'x': 'c'},
436+
{'x': 'd'},
437+
{'x': 'ef'},
438+
{'x': 'g'}
439+
]
440+
expected_metadata = dataset_metadata.DatasetMetadata({
441+
'x': sch.ColumnSchema(tf.string, [], sch.FixedColumnRepresentation())
442+
})
443+
444+
with beam_impl.Context(temp_dir=self.get_temp_dir(), desired_batch_size=1):
445+
transform_fn = ((input_data, input_metadata)
446+
| beam_impl.AnalyzeDataset(preprocessing_fn))
447+
transformed_data, transformed_metadata = (
448+
((input_data, input_metadata), transform_fn)
449+
| beam_impl.TransformDataset())
450+
451+
self.assertDataCloseOrEqual(expected_data, transformed_data)
452+
transformed_metadata = self._resolveDeferredMetadata(transformed_metadata)
453+
self.assertEqual(expected_metadata.schema.column_schemas,
454+
transformed_metadata.schema.column_schemas)
455+
self.assertEqual(expected_metadata, transformed_metadata)
456+
370457
def testAnalyzerBeforeMap(self):
371458
def preprocessing_fn(inputs):
372459
return {'x_scaled': tft.scale_to_0_1(inputs['x'])}
@@ -2595,5 +2682,6 @@ def testBucketizationEqualDistributionInterleaved(self):
25952682
inputs, expected_buckets, tf.int32, num_buckets=101)
25962683

25972684

2685+
25982686
if __name__ == '__main__':
25992687
unittest.main()

0 commit comments

Comments
 (0)