| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277 |
- # coding=utf-8
- # Copyright 2018 The Google AI Language Team Authors.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from __future__ import absolute_import
- from __future__ import division
- from __future__ import print_function
- import collections
- import json
- import random
- import re
- import modeling
- import six
- import tensorflow as tf
- class BertModelTest(tf.test.TestCase):
- class BertModelTester(object):
- def __init__(self,
- parent,
- batch_size=13,
- seq_length=7,
- is_training=True,
- use_input_mask=True,
- use_token_type_ids=True,
- vocab_size=99,
- hidden_size=32,
- num_hidden_layers=5,
- num_attention_heads=4,
- intermediate_size=37,
- hidden_act="gelu",
- hidden_dropout_prob=0.1,
- attention_probs_dropout_prob=0.1,
- max_position_embeddings=512,
- type_vocab_size=16,
- initializer_range=0.02,
- scope=None):
- self.parent = parent
- self.batch_size = batch_size
- self.seq_length = seq_length
- self.is_training = is_training
- self.use_input_mask = use_input_mask
- self.use_token_type_ids = use_token_type_ids
- self.vocab_size = vocab_size
- self.hidden_size = hidden_size
- self.num_hidden_layers = num_hidden_layers
- self.num_attention_heads = num_attention_heads
- self.intermediate_size = intermediate_size
- self.hidden_act = hidden_act
- self.hidden_dropout_prob = hidden_dropout_prob
- self.attention_probs_dropout_prob = attention_probs_dropout_prob
- self.max_position_embeddings = max_position_embeddings
- self.type_vocab_size = type_vocab_size
- self.initializer_range = initializer_range
- self.scope = scope
- def create_model(self):
- input_ids = BertModelTest.ids_tensor([self.batch_size, self.seq_length],
- self.vocab_size)
- input_mask = None
- if self.use_input_mask:
- input_mask = BertModelTest.ids_tensor(
- [self.batch_size, self.seq_length], vocab_size=2)
- token_type_ids = None
- if self.use_token_type_ids:
- token_type_ids = BertModelTest.ids_tensor(
- [self.batch_size, self.seq_length], self.type_vocab_size)
- config = modeling.BertConfig(
- vocab_size=self.vocab_size,
- hidden_size=self.hidden_size,
- num_hidden_layers=self.num_hidden_layers,
- num_attention_heads=self.num_attention_heads,
- intermediate_size=self.intermediate_size,
- hidden_act=self.hidden_act,
- hidden_dropout_prob=self.hidden_dropout_prob,
- attention_probs_dropout_prob=self.attention_probs_dropout_prob,
- max_position_embeddings=self.max_position_embeddings,
- type_vocab_size=self.type_vocab_size,
- initializer_range=self.initializer_range)
- model = modeling.BertModel(
- config=config,
- is_training=self.is_training,
- input_ids=input_ids,
- input_mask=input_mask,
- token_type_ids=token_type_ids,
- scope=self.scope)
- outputs = {
- "embedding_output": model.get_embedding_output(),
- "sequence_output": model.get_sequence_output(),
- "pooled_output": model.get_pooled_output(),
- "all_encoder_layers": model.get_all_encoder_layers(),
- }
- return outputs
- def check_output(self, result):
- self.parent.assertAllEqual(
- result["embedding_output"].shape,
- [self.batch_size, self.seq_length, self.hidden_size])
- self.parent.assertAllEqual(
- result["sequence_output"].shape,
- [self.batch_size, self.seq_length, self.hidden_size])
- self.parent.assertAllEqual(result["pooled_output"].shape,
- [self.batch_size, self.hidden_size])
- def test_default(self):
- self.run_tester(BertModelTest.BertModelTester(self))
- def test_config_to_json_string(self):
- config = modeling.BertConfig(vocab_size=99, hidden_size=37)
- obj = json.loads(config.to_json_string())
- self.assertEqual(obj["vocab_size"], 99)
- self.assertEqual(obj["hidden_size"], 37)
- def run_tester(self, tester):
- with self.test_session() as sess:
- ops = tester.create_model()
- init_op = tf.group(tf.global_variables_initializer(),
- tf.local_variables_initializer())
- sess.run(init_op)
- output_result = sess.run(ops)
- tester.check_output(output_result)
- self.assert_all_tensors_reachable(sess, [init_op, ops])
- @classmethod
- def ids_tensor(cls, shape, vocab_size, rng=None, name=None):
- """Creates a random int32 tensor of the shape within the vocab size."""
- if rng is None:
- rng = random.Random()
- total_dims = 1
- for dim in shape:
- total_dims *= dim
- values = []
- for _ in range(total_dims):
- values.append(rng.randint(0, vocab_size - 1))
- return tf.constant(value=values, dtype=tf.int32, shape=shape, name=name)
- def assert_all_tensors_reachable(self, sess, outputs):
- """Checks that all the tensors in the graph are reachable from outputs."""
- graph = sess.graph
- ignore_strings = [
- "^.*/assert_less_equal/.*$",
- "^.*/dilation_rate$",
- "^.*/Tensordot/concat$",
- "^.*/Tensordot/concat/axis$",
- "^testing/.*$",
- ]
- ignore_regexes = [re.compile(x) for x in ignore_strings]
- unreachable = self.get_unreachable_ops(graph, outputs)
- filtered_unreachable = []
- for x in unreachable:
- do_ignore = False
- for r in ignore_regexes:
- m = r.match(x.name)
- if m is not None:
- do_ignore = True
- if do_ignore:
- continue
- filtered_unreachable.append(x)
- unreachable = filtered_unreachable
- self.assertEqual(
- len(unreachable), 0, "The following ops are unreachable: %s" %
- (" ".join([x.name for x in unreachable])))
- @classmethod
- def get_unreachable_ops(cls, graph, outputs):
- """Finds all of the tensors in graph that are unreachable from outputs."""
- outputs = cls.flatten_recursive(outputs)
- output_to_op = collections.defaultdict(list)
- op_to_all = collections.defaultdict(list)
- assign_out_to_in = collections.defaultdict(list)
- for op in graph.get_operations():
- for x in op.inputs:
- op_to_all[op.name].append(x.name)
- for y in op.outputs:
- output_to_op[y.name].append(op.name)
- op_to_all[op.name].append(y.name)
- if str(op.type) == "Assign":
- for y in op.outputs:
- for x in op.inputs:
- assign_out_to_in[y.name].append(x.name)
- assign_groups = collections.defaultdict(list)
- for out_name in assign_out_to_in.keys():
- name_group = assign_out_to_in[out_name]
- for n1 in name_group:
- assign_groups[n1].append(out_name)
- for n2 in name_group:
- if n1 != n2:
- assign_groups[n1].append(n2)
- seen_tensors = {}
- stack = [x.name for x in outputs]
- while stack:
- name = stack.pop()
- if name in seen_tensors:
- continue
- seen_tensors[name] = True
- if name in output_to_op:
- for op_name in output_to_op[name]:
- if op_name in op_to_all:
- for input_name in op_to_all[op_name]:
- if input_name not in stack:
- stack.append(input_name)
- expanded_names = []
- if name in assign_groups:
- for assign_name in assign_groups[name]:
- expanded_names.append(assign_name)
- for expanded_name in expanded_names:
- if expanded_name not in stack:
- stack.append(expanded_name)
- unreachable_ops = []
- for op in graph.get_operations():
- is_unreachable = False
- all_names = [x.name for x in op.inputs] + [x.name for x in op.outputs]
- for name in all_names:
- if name not in seen_tensors:
- is_unreachable = True
- if is_unreachable:
- unreachable_ops.append(op)
- return unreachable_ops
- @classmethod
- def flatten_recursive(cls, item):
- """Flattens (potentially nested) a tuple/dictionary/list to a list."""
- output = []
- if isinstance(item, list):
- output.extend(item)
- elif isinstance(item, tuple):
- output.extend(list(item))
- elif isinstance(item, dict):
- for (_, v) in six.iteritems(item):
- output.append(v)
- else:
- return [item]
- flat_output = []
- for x in output:
- flat_output.extend(cls.flatten_recursive(x))
- return flat_output
- if __name__ == "__main__":
- tf.test.main()
|