0%

TensorFlow_rnn_cell_impl源码阅读

实现RNN单元的模块。该模块提供了一些基本常用的RNN单元,如LSTM(长短期记忆)或GRU(门控循环单元),以及一些允许为输入添加 dropout、投影连接或嵌入的操作。构建多层单元格由“MultiRNNCellL”类支持,或多次调用 RNN OPS。

LSTM与GRU结构介绍 https://blog.csdn.net/qq_28743951/article/details/78974058
三次简化一张图:一招理解LSTM/GRU门控机制 https://www.jiqizhixin.com/articles/2018-12-18-12

虽然理论上 RNN 可以捕获长距离依赖,但实际应用中,RNN 将会面临两个挑战:梯度爆炸(gradient explosion)和梯度消失(vanishing gradient)

  1. 梯度爆炸相对比较好处理,可以用梯度裁剪(gradient clipping)来解决:在 RNN 中,不管梯度回传的时候大到什么程度,设置一个梯度的阈值,梯度最多是这么大。
  2. 梯度消失现象解决起来困难很多,如何缓解梯度消失是 RNN 及几乎其他所有深度学习方法研究的关键所在。LSTM 和 GRU 通过门(gate)机制控制 RNN 中的信息流动,用来缓解梯度消失问题。其核心思想是有选择性的处理输入。

LSTM 处理梯度消失问题的例子,比如我们在看到一个商品的评论时

Amazing! This box of cereal gave me a perfectly balanced breakfast, as all things should be. In only ate half of it but will definitely be buying again!

我们会重点关注其中的一些词,对它们进行处理

Amazing! This box of cereal gave me a perfectly balanced breakfast, as all things should be. In only ate half of it but will definitely be buying again!

LSTM 和 GRU 的关键是会选择性地忽略其中一些词,不让其参与到隐层状态向量的
更新中,最后只保留相关的信息进行预测。

tensorflow/tensorflow/python/ops/rnn_cell_impl.py

“””Module implementing RNN Cells.
This module provides a number of basic commonly used RNN cells, such as LSTM
(Long Short Term Memory) or GRU (Gated Recurrent Unit), and a number of
operators that allow adding dropouts, projections, or embeddings for inputs.
Constructing multi-layer cells is supported by the class MultiRNNCell, or by
calling the rnn ops several times.
“””

class RNNCell(base_layer.Layer)

重点:

  • Every RNNCell must have the properties below and implement call with the signature (output, next_state) = call(input, state).
  • An RNN cell, in the most abstract setting, is anything that has
    a state and performs some operation that takes a matrix of inputs.This operation results in an output matrix with self.output_size columns.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
@tf_export("nn.rnn_cell.RNNCell")
class RNNCell(base_layer.Layer):
"""Abstract object representing an RNN cell.
Every `RNNCell` must have the properties below and implement `call` with
the signature `(output, next_state) = call(input, state)`. The optional
third input argument, `scope`, is allowed for backwards compatibility
purposes; but should be left off for new subclasses.
This definition of cell differs from the definition used in the literature.
In the literature, 'cell' refers to an object with a single scalar output.
This definition refers to a horizontal array of such units.
An RNN cell, in the most abstract setting, is anything that has
a state and performs some operation that takes a matrix of inputs.
This operation results in an output matrix with `self.output_size` columns.
If `self.state_size` is an integer, this operation also results in a new
state matrix with `self.state_size` columns. If `self.state_size` is a
(possibly nested tuple of) TensorShape object(s), then it should return a
matching structure of Tensors having shape `[batch_size].concatenate(s)`
for each `s` in `self.batch_size`.
"""

def __init__(self, trainable=True, name=None, dtype=None, **kwargs):
super(RNNCell, self).__init__(
trainable=trainable, name=name, dtype=dtype, **kwargs)
# Attribute that indicates whether the cell is a TF RNN cell, due the slight
# difference between TF and Keras RNN cell.
self._is_tf_rnn_cell = True

def __call__(self, inputs, state, scope=None):
"""Run this RNN cell on inputs, starting from the given state.
Args:
inputs: `2-D` tensor with shape `[batch_size, input_size]`.
state: if `self.state_size` is an integer, this should be a `2-D Tensor`
with shape `[batch_size, self.state_size]`. Otherwise, if
`self.state_size` is a tuple of integers, this should be a tuple
with shapes `[batch_size, s] for s in self.state_size`.
scope: VariableScope for the created subgraph; defaults to class name.
Returns:
A pair containing:
- Output: A `2-D` tensor with shape `[batch_size, self.output_size]`.
- New state: Either a single `2-D` tensor, or a tuple of tensors matching
the arity and shapes of `state`.
"""
if scope is not None:
with vs.variable_scope(scope,
custom_getter=self._rnn_get_variable) as scope:
return super(RNNCell, self).__call__(inputs, state, scope=scope)
else:
scope_attrname = "rnncell_scope"
scope = getattr(self, scope_attrname, None)
if scope is None:
scope = vs.variable_scope(vs.get_variable_scope(),
custom_getter=self._rnn_get_variable)
setattr(self, scope_attrname, scope)
with scope:
return super(RNNCell, self).__call__(inputs, state)

def _rnn_get_variable(self, getter, *args, **kwargs):
variable = getter(*args, **kwargs)
if context.executing_eagerly():
trainable = variable._trainable # pylint: disable=protected-access
else:
trainable = (
variable in tf_variables.trainable_variables() or
(isinstance(variable, tf_variables.PartitionedVariable) and
list(variable)[0] in tf_variables.trainable_variables()))
if trainable and variable not in self._trainable_weights:
self._trainable_weights.append(variable)
elif not trainable and variable not in self._non_trainable_weights:
self._non_trainable_weights.append(variable)
return variable

@property
def state_size(self):
"""size(s) of state(s) used by this cell.
It can be represented by an Integer, a TensorShape or a tuple of Integers
or TensorShapes.
"""
raise NotImplementedError("Abstract method")

@property
def output_size(self):
"""Integer or TensorShape: size of outputs produced by this cell."""
raise NotImplementedError("Abstract method")

def build(self, _):
# This tells the parent Layer object that it's OK to call
# self.add_variable() inside the call() method.
pass

def get_initial_state(self, inputs=None, batch_size=None, dtype=None):
if inputs is not None:
# Validate the given batch_size and dtype against inputs if provided.
inputs = ops.convert_to_tensor(inputs, name="inputs")
if batch_size is not None:
if tensor_util.is_tensor(batch_size):
static_batch_size = tensor_util.constant_value(
batch_size, partial=True)
else:
static_batch_size = batch_size
if inputs.shape.dims[0].value != static_batch_size:
raise ValueError(
"batch size from input tensor is different from the "
"input param. Input tensor batch: {}, batch_size: {}".format(
inputs.shape.dims[0].value, batch_size))

if dtype is not None and inputs.dtype != dtype:
raise ValueError(
"dtype from input tensor is different from the "
"input param. Input tensor dtype: {}, dtype: {}".format(
inputs.dtype, dtype))

batch_size = inputs.shape.dims[0].value or array_ops.shape(inputs)[0]
dtype = inputs.dtype
if None in [batch_size, dtype]:
raise ValueError(
"batch_size and dtype cannot be None while constructing initial "
"state: batch_size={}, dtype={}".format(batch_size, dtype))
return self.zero_state(batch_size, dtype)

def zero_state(self, batch_size, dtype):
"""Return zero-filled state tensor(s).
Args:
batch_size: int, float, or unit Tensor representing the batch size.
dtype: the data type to use for the state.
Returns:
If `state_size` is an int or TensorShape, then the return value is a
`N-D` tensor of shape `[batch_size, state_size]` filled with zeros.
If `state_size` is a nested list or tuple, then the return value is
a nested list or tuple (of the same structure) of `2-D` tensors with
the shapes `[batch_size, s]` for each s in `state_size`.
"""
# Try to use the last cached zero_state. This is done to avoid recreating
# zeros, especially when eager execution is enabled.
state_size = self.state_size
is_eager = context.executing_eagerly()
if is_eager and hasattr(self, "_last_zero_state"):
(last_state_size, last_batch_size, last_dtype,
last_output) = getattr(self, "_last_zero_state")
if (last_batch_size == batch_size and
last_dtype == dtype and
last_state_size == state_size):
return last_output
with ops.name_scope(type(self).__name__ + "ZeroState", values=[batch_size]):
output = _zero_state_tensors(state_size, batch_size, dtype)
if is_eager:
self._last_zero_state = (state_size, batch_size, dtype, output)
return output

class LayerRNNCell(RNNCell)

class BasicRNNCell(LayerRNNCell)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
class LayerRNNCell(RNNCell):
"""Subclass of RNNCells that act like proper `tf.Layer` objects.
For backwards compatibility purposes, most `RNNCell` instances allow their
`call` methods to instantiate variables via `tf.get_variable`. The underlying
variable scope thus keeps track of any variables, and returning cached
versions. This is atypical of `tf.layer` objects, which separate this
part of layer building into a `build` method that is only called once.
Here we provide a subclass for `RNNCell` objects that act exactly as
`Layer` objects do. They must provide a `build` method and their
`call` methods do not access Variables `tf.get_variable`.
"""

def __call__(self, inputs, state, scope=None, *args, **kwargs):
"""Run this RNN cell on inputs, starting from the given state.
Args:
inputs: `2-D` tensor with shape `[batch_size, input_size]`.
state: if `self.state_size` is an integer, this should be a `2-D Tensor`
with shape `[batch_size, self.state_size]`. Otherwise, if
`self.state_size` is a tuple of integers, this should be a tuple
with shapes `[batch_size, s] for s in self.state_size`.
scope: optional cell scope.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
A pair containing:
- Output: A `2-D` tensor with shape `[batch_size, self.output_size]`.
- New state: Either a single `2-D` tensor, or a tuple of tensors matching
the arity and shapes of `state`.
"""
# Bypass RNNCell's variable capturing semantics for LayerRNNCell.
# Instead, it is up to subclasses to provide a proper build
# method. See the class docstring for more details.
return base_layer.Layer.__call__(self, inputs, state, scope=scope,
*args, **kwargs)


@tf_export(v1=["nn.rnn_cell.BasicRNNCell"])
class BasicRNNCell(LayerRNNCell):
"""The most basic RNN cell.
Note that this cell is not optimized for performance. Please use
`tf.contrib.cudnn_rnn.CudnnRNNTanh` for better performance on GPU.
Args:
num_units: int, The number of units in the RNN cell.
activation: Nonlinearity to use. Default: `tanh`. It could also be string
that is within Keras activation function names.
reuse: (optional) Python boolean describing whether to reuse variables
in an existing scope. If not `True`, and the existing scope already has
the given variables, an error is raised.
name: String, the name of the layer. Layers with the same name will
share weights, but to avoid mistakes we require reuse=True in such
cases.
dtype: Default dtype of the layer (default of `None` means use the type
of the first input). Required when `build` is called before `call`.
**kwargs: Dict, keyword named properties for common layer attributes, like
`trainable` etc when constructing the cell from configs of get_config().
"""

@deprecated(None, "This class is equivalent as tf.keras.layers.SimpleRNNCell,"
" and will be replaced by that in Tensorflow 2.0.")
def __init__(self,
num_units,
activation=None,
reuse=None,
name=None,
dtype=None,
**kwargs):
super(BasicRNNCell, self).__init__(
_reuse=reuse, name=name, dtype=dtype, **kwargs)
if context.executing_eagerly() and context.num_gpus() > 0:
logging.warn("%s: Note that this cell is not optimized for performance. "
"Please use tf.contrib.cudnn_rnn.CudnnRNNTanh for better "
"performance on GPU.", self)

# Inputs must be 2-dimensional.
self.input_spec = input_spec.InputSpec(ndim=2)

self._num_units = num_units
if activation:
self._activation = activations.get(activation)
else:
self._activation = math_ops.tanh

@property
def state_size(self):
return self._num_units

@property
def output_size(self):
return self._num_units

@tf_utils.shape_type_conversion
def build(self, inputs_shape):
if inputs_shape[-1] is None:
raise ValueError("Expected inputs.shape[-1] to be known, saw shape: %s"
% str(inputs_shape))

input_depth = inputs_shape[-1]
self._kernel = self.add_variable(
_WEIGHTS_VARIABLE_NAME,
shape=[input_depth + self._num_units, self._num_units])
self._bias = self.add_variable(
_BIAS_VARIABLE_NAME,
shape=[self._num_units],
initializer=init_ops.zeros_initializer(dtype=self.dtype))

self.built = True

def call(self, inputs, state):
"""Most basic RNN: output = new_state = act(W * input + U * state + B)."""

gate_inputs = math_ops.matmul(
array_ops.concat([inputs, state], 1), self._kernel)
gate_inputs = nn_ops.bias_add(gate_inputs, self._bias)
output = self._activation(gate_inputs)
return output, output

def get_config(self):
config = {
"num_units": self._num_units,
"activation": activations.serialize(self._activation),
"reuse": self._reuse,
}
base_config = super(BasicRNNCell, self).get_config()
return dict(list(base_config.items()) + list(config.items()))

https://zh.diveintodeeplearning.org/chapter_recurrent-neural-networks/rnn.html

假设$\boldsymbol{X}_t \in \mathbb{R}^{n \times d}$是序列中时间步$t$的小批量输入,$\boldsymbol{H}_t \in \mathbb{R}^{n \times h}$是该时间步的隐藏层变量。跟多层感知机不同的是,这里我们保存上一时间步的隐藏变量$\boldsymbol{H}_{t-1}$,并引入一个新的权重参数$\boldsymbol{W}_{hh} \in \mathbb{R}^{h \times h}$,该参数用来描述在当前时间步如何使用上一时间步的隐藏变量。具体来说,当前时间步的隐藏变量的计算由当前时间步的输入和上一时间步的隐藏变量共同决定:

与多层感知机相比,我们在这里添加了$\boldsymbol{H}_{t-1} \boldsymbol{W}_{hh}$一项。由上式中相邻时间步的隐藏变量$\boldsymbol{H}_t$和$\boldsymbol{H}_{t-1}$之间的关系可知,这里的隐藏变量捕捉了截至当前时间步的序列的历史信息,就像是神经网络当前时间步的状态或记忆一样。因此,该隐藏变量也称为隐藏状态。由于隐藏状态在当前时间步的定义使用了它在上一时间步相同的定义,上式的计算是循环的。使用循环计算的网络即循环神经网络。

下图展示了循环神经网络在三个相邻时间步的计算逻辑。在时间步$t$,隐藏状态的计算可以看成是将输入$\boldsymbol{X}_t$和前一时间步隐藏状态$\boldsymbol{H}_{t-1}$连结后输入一个激活函数为$\phi$的全连接层。该全连接层的输出就是当前时间步的隐藏状态$\boldsymbol{H}_t$,且模型参数为$\boldsymbol{W}_{xh}$与$\boldsymbol{W}_{hh}$的连结,偏差为$\boldsymbol{b}_h$。当前时间步$t$的隐藏状态$\boldsymbol{H}_t$将参与下一个时间步$t+1$的隐藏状态$\boldsymbol{H}_{t+1}$的计算,并输入到当前时间步的全连接输出层。

含隐藏状态的循环神经网络。

我们刚刚提到,隐藏状态中$\boldsymbol{X}_t \boldsymbol{W}_{xh} + \boldsymbol{H}_{t-1} \boldsymbol{W}_{hh}$的计算等价于$\boldsymbol{X}_t$与$\boldsymbol{H}_{t-1}$连结后的矩阵乘以$\boldsymbol{W}_{xh}$与$\boldsymbol{W}_{hh}$连结后的矩阵。class BasicRNNCell(LayerRNNCell) 中以及后文call中都是采用了这个技巧,这样可以增加神经网络的计算效率。

class GRUCell(LayerRNNCell)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
@tf_export(v1=["nn.rnn_cell.GRUCell"])
class GRUCell(LayerRNNCell):
"""Gated Recurrent Unit cell (cf. http://arxiv.org/abs/1406.1078).
Note that this cell is not optimized for performance. Please use
`tf.contrib.cudnn_rnn.CudnnGRU` for better performance on GPU, or
`tf.contrib.rnn.GRUBlockCellV2` for better performance on CPU.
Args:
num_units: int, The number of units in the GRU cell.
activation: Nonlinearity to use. Default: `tanh`.
reuse: (optional) Python boolean describing whether to reuse variables
in an existing scope. If not `True`, and the existing scope already has
the given variables, an error is raised.
kernel_initializer: (optional) The initializer to use for the weight and
projection matrices.
bias_initializer: (optional) The initializer to use for the bias.
name: String, the name of the layer. Layers with the same name will
share weights, but to avoid mistakes we require reuse=True in such
cases.
dtype: Default dtype of the layer (default of `None` means use the type
of the first input). Required when `build` is called before `call`.
**kwargs: Dict, keyword named properties for common layer attributes, like
`trainable` etc when constructing the cell from configs of get_config().
"""

@deprecated(None, "This class is equivalent as tf.keras.layers.GRUCell,"
" and will be replaced by that in Tensorflow 2.0.")
def __init__(self,
num_units,
activation=None,
reuse=None,
kernel_initializer=None,
bias_initializer=None,
name=None,
dtype=None,
**kwargs):
super(GRUCell, self).__init__(
_reuse=reuse, name=name, dtype=dtype, **kwargs)

if context.executing_eagerly() and context.num_gpus() > 0:
logging.warn("%s: Note that this cell is not optimized for performance. "
"Please use tf.contrib.cudnn_rnn.CudnnGRU for better "
"performance on GPU.", self)
# Inputs must be 2-dimensional.
self.input_spec = input_spec.InputSpec(ndim=2)

self._num_units = num_units
if activation:
self._activation = activations.get(activation)
else:
self._activation = math_ops.tanh
self._kernel_initializer = initializers.get(kernel_initializer)
self._bias_initializer = initializers.get(bias_initializer)

@property
def state_size(self):
return self._num_units

@property
def output_size(self):
return self._num_units

@tf_utils.shape_type_conversion
def build(self, inputs_shape):
if inputs_shape[-1] is None:
raise ValueError("Expected inputs.shape[-1] to be known, saw shape: %s"
% str(inputs_shape))

input_depth = inputs_shape[-1]
self._gate_kernel = self.add_variable(
"gates/%s" % _WEIGHTS_VARIABLE_NAME,
shape=[input_depth + self._num_units, 2 * self._num_units],
initializer=self._kernel_initializer)
self._gate_bias = self.add_variable(
"gates/%s" % _BIAS_VARIABLE_NAME,
shape=[2 * self._num_units],
initializer=(
self._bias_initializer
if self._bias_initializer is not None
else init_ops.constant_initializer(1.0, dtype=self.dtype)))
self._candidate_kernel = self.add_variable(
"candidate/%s" % _WEIGHTS_VARIABLE_NAME,
shape=[input_depth + self._num_units, self._num_units],
initializer=self._kernel_initializer)
self._candidate_bias = self.add_variable(
"candidate/%s" % _BIAS_VARIABLE_NAME,
shape=[self._num_units],
initializer=(
self._bias_initializer
if self._bias_initializer is not None
else init_ops.zeros_initializer(dtype=self.dtype)))

self.built = True

def call(self, inputs, state):
"""Gated recurrent unit (GRU) with nunits cells."""

gate_inputs = math_ops.matmul(
array_ops.concat([inputs, state], 1), self._gate_kernel)
gate_inputs = nn_ops.bias_add(gate_inputs, self._gate_bias)

value = math_ops.sigmoid(gate_inputs)
r, u = array_ops.split(value=value, num_or_size_splits=2, axis=1)

r_state = r * state

candidate = math_ops.matmul(
array_ops.concat([inputs, r_state], 1), self._candidate_kernel)
candidate = nn_ops.bias_add(candidate, self._candidate_bias)

c = self._activation(candidate)
new_h = u * state + (1 - u) * c
return new_h, new_h

def get_config(self):
config = {
"num_units": self._num_units,
"kernel_initializer": initializers.serialize(self._kernel_initializer),
"bias_initializer": initializers.serialize(self._bias_initializer),
"activation": activations.serialize(self._activation),
"reuse": self._reuse,
}
base_config = super(GRUCell, self).get_config()
return dict(list(base_config.items()) + list(config.items()))

门控循环单元(GRU)https://zh.diveintodeeplearning.org/chapter_recurrent-neural-networks/gru.html

门控循环单元(GRU)

我们发现,当时间步数较大或者时间步较小时,循环神经网络的梯度较容易出现衰减或爆炸。虽然裁剪梯度可以应对梯度爆炸,但无法解决梯度衰减的问题。通常由于这个原因,循环神经网络在实际中较难捕捉时间序列中时间步距离较大的依赖关系。

门控循环神经网络(gated recurrent neural network)的提出,正是为了更好地捕捉时间序列中时间步距离较大的依赖关系。它通过可以学习的门来控制信息的流动。其中,门控循环单元(gated recurrent unit,简称GRU)是一种常用的门控循环神经网络。

门控循环单元

下面将介绍门控循环单元的设计。它引入了重置门和更新门的概念,从而修改了循环神经网络中隐藏状态的计算方式。

重置门和更新门

如图6.4所示,门控循环单元中的重置门(reset gate)和更新门(update gate)的输入均为当前时间步输入$\boldsymbol{X}_t$与上一时间步隐藏状态$\boldsymbol{H}_{t-1}$,输出由激活函数为sigmoid函数的全连接层计算得到。

门控循环单元中重置门和更新门的计算。

具体来说,假设隐藏单元个数为$h$,给定时间步$t$的小批量输入$\boldsymbol{X}_t \in \mathbb{R}^{n \times d}$(样本数为$n$,输入个数为$d$)和上一时间步隐藏状态$\boldsymbol{H}_{t-1} \in \mathbb{R}^{n \times h}$。重置门$\boldsymbol{R}_t \in \mathbb{R}^{n \times h}$和更新门$\boldsymbol{Z}_t \in \mathbb{R}^{n \times h}$的计算如下:

其中$\boldsymbol{W}_{xr}, \boldsymbol{W}_{xz} \in \mathbb{R}^{d \times h}$和$\boldsymbol{W}_{hr}, \boldsymbol{W}_{hz} \in \mathbb{R}^{h \times h}$是权重参数,$\boldsymbol{b}_r, \boldsymbol{b}_z \in \mathbb{R}^{1 \times h}$是偏差参数。“多层感知机”一节中介绍过,sigmoid函数可以将元素的值变换到0和1之间。因此,重置门$\boldsymbol{R}_t$和更新门$\boldsymbol{Z}_t$中每个元素的值域都是$[0, 1]$。

候选隐藏状态

接下来,门控循环单元将计算候选隐藏状态来辅助稍后的隐藏状态计算。如图所示,我们将当前时间步重置门的输出与上一时间步隐藏状态做按元素乘法(符号为$\odot$)。如果重置门中元素值接近0,那么意味着重置对应隐藏状态元素为0,即丢弃上一时间步的隐藏状态。如果元素值接近1,那么表示保留上一时间步的隐藏状态。然后,将按元素乘法的结果与当前时间步的输入连结,再通过含激活函数tanh的全连接层计算出候选隐藏状态,其所有元素的值域为$[-1, 1]$。

门控循环单元中候选隐藏状态的计算。这里的乘号是按元素乘法。

具体来说,时间步$t$的候选隐藏状态$\tilde{\boldsymbol{H}}_t \in \mathbb{R}^{n \times h}$的计算为

其中$\boldsymbol{W}_{xh} \in \mathbb{R}^{d \times h}$和$\boldsymbol{W}_{hh} \in \mathbb{R}^{h \times h}$是权重参数,$\boldsymbol{b}_h \in \mathbb{R}^{1 \times h}$是偏差参数。从上面这个公式可以看出,重置门控制了上一时间步的隐藏状态如何流入当前时间步的候选隐藏状态。而上一时间步的隐藏状态可能包含了时间序列截至上一时间步的全部历史信息。因此,重置门可以用来丢弃与预测无关的历史信息。

隐藏状态

最后,时间步$t$的隐藏状态$\boldsymbol{H}_t \in \mathbb{R}^{n \times h}$的计算使用当前时间步的更新门$\boldsymbol{Z}_t$来对上一时间步的隐藏状态$\boldsymbol{H}_{t-1}$和当前时间步的候选隐藏状态$\tilde{\boldsymbol{H}}_t$做组合:

门控循环单元中隐藏状态的计算。这里的乘号是按元素乘法。

值得注意的是,更新门可以控制隐藏状态应该如何被包含当前时间步信息的候选隐藏状态所更新,如图所示。假设更新门在时间步$t’$到$t$($t’ < t$)之间一直近似1。那么,在时间步$t’$到$t$之间的输入信息几乎没有流入时间步$t$的隐藏状态$\boldsymbol{H}_t$。实际上,这可以看作是较早时刻的隐藏状态$\boldsymbol{H}_{t’-1}$一直通过时间保存并传递至当前时间步$t$。这个设计可以应对循环神经网络中的梯度衰减问题,并更好地捕捉时间序列中时间步距离较大的依赖关系。

我们对门控循环单元的设计稍作总结:

  • 重置门有助于捕捉时间序列里短期的依赖关系。
  • 更新门有助于捕捉时间序列里长期的依赖关系。

class LSTMStateTuple

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
_LSTMStateTuple = collections.namedtuple("LSTMStateTuple", ("c", "h"))

@tf_export("nn.rnn_cell.LSTMStateTuple")
class LSTMStateTuple(_LSTMStateTuple):
"""Tuple used by LSTM Cells for `state_size`, `zero_state`, and output state.
Stores two elements: `(c, h)`, in that order. Where `c` is the hidden state
and `h` is the output.
Only used when `state_is_tuple=True`.
"""
__slots__ = ()

@property
def dtype(self):
(c, h) = self
if c.dtype != h.dtype:
raise TypeError("Inconsistent internal state: %s vs %s" %
(str(c.dtype), str(h.dtype)))
return c.dtype

class BasicLSTMCell(LayerRNNCell)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
@tf_export(v1=["nn.rnn_cell.BasicLSTMCell"])
class BasicLSTMCell(LayerRNNCell):
"""DEPRECATED: Please use `tf.nn.rnn_cell.LSTMCell` instead.
Basic LSTM recurrent network cell.
The implementation is based on: http://arxiv.org/abs/1409.2329.
We add forget_bias (default: 1) to the biases of the forget gate in order to
reduce the scale of forgetting in the beginning of the training.
It does not allow cell clipping, a projection layer, and does not
use peep-hole connections: it is the basic baseline.
For advanced models, please use the full `tf.nn.rnn_cell.LSTMCell`
that follows.
Note that this cell is not optimized for performance. Please use
`tf.contrib.cudnn_rnn.CudnnLSTM` for better performance on GPU, or
`tf.contrib.rnn.LSTMBlockCell` and `tf.contrib.rnn.LSTMBlockFusedCell` for
better performance on CPU.
"""

@deprecated(None, "This class is equivalent as tf.keras.layers.LSTMCell,"
" and will be replaced by that in Tensorflow 2.0.")
def __init__(self,
num_units,
forget_bias=1.0,
state_is_tuple=True,
activation=None,
reuse=None,
name=None,
dtype=None,
**kwargs):
"""Initialize the basic LSTM cell.
Args:
num_units: int, The number of units in the LSTM cell.
forget_bias: float, The bias added to forget gates (see above).
Must set to `0.0` manually when restoring from CudnnLSTM-trained
checkpoints.
state_is_tuple: If True, accepted and returned states are 2-tuples of
the `c_state` and `m_state`. If False, they are concatenated
along the column axis. The latter behavior will soon be deprecated.
activation: Activation function of the inner states. Default: `tanh`. It
could also be string that is within Keras activation function names.
reuse: (optional) Python boolean describing whether to reuse variables
in an existing scope. If not `True`, and the existing scope already has
the given variables, an error is raised.
name: String, the name of the layer. Layers with the same name will
share weights, but to avoid mistakes we require reuse=True in such
cases.
dtype: Default dtype of the layer (default of `None` means use the type
of the first input). Required when `build` is called before `call`.
**kwargs: Dict, keyword named properties for common layer attributes, like
`trainable` etc when constructing the cell from configs of get_config().
When restoring from CudnnLSTM-trained checkpoints, must use
`CudnnCompatibleLSTMCell` instead.
"""
super(BasicLSTMCell, self).__init__(
_reuse=reuse, name=name, dtype=dtype, **kwargs)
if not state_is_tuple:
logging.warn("%s: Using a concatenated state is slower and will soon be "
"deprecated. Use state_is_tuple=True.", self)
if context.executing_eagerly() and context.num_gpus() > 0:
logging.warn("%s: Note that this cell is not optimized for performance. "
"Please use tf.contrib.cudnn_rnn.CudnnLSTM for better "
"performance on GPU.", self)

# Inputs must be 2-dimensional.
self.input_spec = input_spec.InputSpec(ndim=2)

self._num_units = num_units
self._forget_bias = forget_bias
self._state_is_tuple = state_is_tuple
if activation:
self._activation = activations.get(activation)
else:
self._activation = math_ops.tanh

@property
def state_size(self):
return (LSTMStateTuple(self._num_units, self._num_units)
if self._state_is_tuple else 2 * self._num_units)

@property
def output_size(self):
return self._num_units

@tf_utils.shape_type_conversion
def build(self, inputs_shape):
if inputs_shape[-1] is None:
raise ValueError("Expected inputs.shape[-1] to be known, saw shape: %s"
% str(inputs_shape))

input_depth = inputs_shape[-1]
h_depth = self._num_units
self._kernel = self.add_variable(
_WEIGHTS_VARIABLE_NAME,
shape=[input_depth + h_depth, 4 * self._num_units])
self._bias = self.add_variable(
_BIAS_VARIABLE_NAME,
shape=[4 * self._num_units],
initializer=init_ops.zeros_initializer(dtype=self.dtype))

self.built = True

def call(self, inputs, state):
"""Long short-term memory cell (LSTM).
Args:
inputs: `2-D` tensor with shape `[batch_size, input_size]`.
state: An `LSTMStateTuple` of state tensors, each shaped
`[batch_size, num_units]`, if `state_is_tuple` has been set to
`True`. Otherwise, a `Tensor` shaped
`[batch_size, 2 * num_units]`.
Returns:
A pair containing the new hidden state, and the new state (either a
`LSTMStateTuple` or a concatenated state, depending on
`state_is_tuple`).
"""
sigmoid = math_ops.sigmoid
one = constant_op.constant(1, dtype=dtypes.int32)
# Parameters of gates are concatenated into one multiply for efficiency.
if self._state_is_tuple:
c, h = state
else:
c, h = array_ops.split(value=state, num_or_size_splits=2, axis=one)

gate_inputs = math_ops.matmul(
array_ops.concat([inputs, h], 1), self._kernel)
gate_inputs = nn_ops.bias_add(gate_inputs, self._bias)

# i = input_gate, j = new_input, f = forget_gate, o = output_gate
i, j, f, o = array_ops.split(
value=gate_inputs, num_or_size_splits=4, axis=one)

forget_bias_tensor = constant_op.constant(self._forget_bias, dtype=f.dtype)
# Note that using `add` and `multiply` instead of `+` and `*` gives a
# performance improvement. So using those at the cost of readability.
add = math_ops.add
multiply = math_ops.multiply
new_c = add(multiply(c, sigmoid(add(f, forget_bias_tensor))),
multiply(sigmoid(i), self._activation(j)))
new_h = multiply(self._activation(new_c), sigmoid(o))

if self._state_is_tuple:
new_state = LSTMStateTuple(new_c, new_h)
else:
new_state = array_ops.concat([new_c, new_h], 1)
return new_h, new_state

def get_config(self):
config = {
"num_units": self._num_units,
"forget_bias": self._forget_bias,
"state_is_tuple": self._state_is_tuple,
"activation": activations.serialize(self._activation),
"reuse": self._reuse,
}
base_config = super(BasicLSTMCell, self).get_config()
return dict(list(base_config.items()) + list(config.items()))

forget_bias 作用
forget_bias: Biases of the forget gate are initialized by default to 1 in order to reduce the scale of forgetting at the beginning of the training. Must set it manually to 0.0 when restoring from CudnnLSTM trained checkpoints.

长短期记忆(LSTM)

本节将介绍另一种常用的门控循环神经网络:长短期记忆(long short-term memory,简称LSTM)。它比门控循环单元的结构稍微复杂一点。

长短期记忆

LSTM 中引入了三个门:输入门(input gate)、遗忘门(forget gate)和输出门(output gate),以及与隐藏状态形状相同的记忆细胞(某些文献把记忆细胞当成一种特殊的隐藏状态),从而记录额外的信息。

输入门、遗忘门和输出门

同门控循环单元中的重置门和更新门一样,如图所示,长短期记忆的门的输入均为当前时间步输入$\boldsymbol{X}_t$与上一时间步隐藏状态$\boldsymbol{H}_{t-1}$,输出由激活函数为sigmoid函数的全连接层计算得到。如此一来,这三个门元素的值域均为$[0,1]$。

长短期记忆中输入门、遗忘门和输出门的计算。

具体来说,假设隐藏单元个数为$h$,给定时间步$t$的小批量输入$\boldsymbol{X}_t \in \mathbb{R}^{n \times d}$(样本数为$n$,输入个数为$d$)和上一时间步隐藏状态$\boldsymbol{H}_{t-1} \in \mathbb{R}^{n \times h}$。
时间步$t$的输入门$\boldsymbol{I}_t \in \mathbb{R}^{n \times h}$、遗忘门$\boldsymbol{F}_t \in \mathbb{R}^{n \times h}$和输出门$\boldsymbol{O}_t \in \mathbb{R}^{n \times h}$分别计算如下:

其中的$\boldsymbol{W}_{xi}, \boldsymbol{W}_{xf}, \boldsymbol{W}_{xo} \in \mathbb{R}^{d \times h}$和$\boldsymbol{W}_{hi}, \boldsymbol{W}_{hf}, \boldsymbol{W}_{ho} \in \mathbb{R}^{h \times h}$是权重参数,$\boldsymbol{b}_i, \boldsymbol{b}_f, \boldsymbol{b}_o \in \mathbb{R}^{1 \times h}$是偏差参数。

候选记忆细胞

接下来,长短期记忆需要计算候选记忆细胞$\tilde{\boldsymbol{C}}_t$。它的计算同上面介绍的三个门类似,但使用了值域在$[-1, 1]$的tanh函数做激活函数,如图所示。

长短期记忆中候选记忆细胞的计算。

具体来说,时间步$t$的候选记忆细胞$\tilde{\boldsymbol{C}}_t \in \mathbb{R}^{n \times h}$的计算为

其中的$\boldsymbol{W}_{xc} \in \mathbb{R}^{d \times h}$和$\boldsymbol{W}_{hc} \in \mathbb{R}^{h \times h}$是权重参数,$\boldsymbol{b}_c \in \mathbb{R}^{1 \times h}$是偏差参数。

记忆细胞

我们可以通过元素值域在$[0, 1]$的输入门、遗忘门和输出门来控制隐藏状态中信息的流动:这一般也是通过使用按元素乘法(符号为$\odot$)来实现。当前时间步记忆细胞$\boldsymbol{C}_t \in \mathbb{R}^{n \times h}$的计算组合了上一时间步记忆细胞和当前时间步候选记忆细胞的信息,并通过遗忘门和输入门来控制信息的流动:

如图所示,遗忘门控制上一时间步的记忆细胞$\boldsymbol{C}_{t-1}$中的信息是否传递到当前时间步,而输入门则可以控制当前时间步的输入$\boldsymbol{X}_t$通过候选记忆细胞$\tilde{\boldsymbol{C}}_t$如何流入当前时间步的记忆细胞。如果遗忘门一直近似1且输入门一直近似0,过去的记忆细胞将一直通过时间保存并传递至当前时间步。这个设计可以应对循环神经网络中的梯度衰减问题,并更好地捕捉时间序列中时间步距离较大的依赖关系。

长短期记忆中记忆细胞的计算。这里的乘号是按元素乘法。

隐藏状态

有了记忆细胞以后,接下来我们还可以通过输出门来控制从记忆细胞到隐藏状态$\boldsymbol{H}_t \in \mathbb{R}^{n \times h}$的信息的流动:

这里的tanh函数确保隐藏状态元素值在-1到1之间。需要注意的是,当输出门近似1时,记忆细胞信息将传递到隐藏状态供输出层使用;当输出门近似0时,记忆细胞信息只自己保留。图6.10展示了长短期记忆中隐藏状态的计算。

长短期记忆中隐藏状态的计算。这里的乘号是按元素乘法。

class LSTMCell(LayerRNNCell)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
@tf_export(v1=["nn.rnn_cell.LSTMCell"])
class LSTMCell(LayerRNNCell):
"""Long short-term memory unit (LSTM) recurrent network cell.
The default non-peephole implementation is based on:
https://pdfs.semanticscholar.org/1154/0131eae85b2e11d53df7f1360eeb6476e7f4.pdf
Felix Gers, Jurgen Schmidhuber, and Fred Cummins.
"Learning to forget: Continual prediction with LSTM." IET, 850-855, 1999.
The peephole implementation is based on:
https://research.google.com/pubs/archive/43905.pdf
Hasim Sak, Andrew Senior, and Francoise Beaufays.
"Long short-term memory recurrent neural network architectures for
large scale acoustic modeling." INTERSPEECH, 2014.
The class uses optional peep-hole connections, optional cell clipping, and
an optional projection layer.
Note that this cell is not optimized for performance. Please use
`tf.contrib.cudnn_rnn.CudnnLSTM` for better performance on GPU, or
`tf.contrib.rnn.LSTMBlockCell` and `tf.contrib.rnn.LSTMBlockFusedCell` for
better performance on CPU.
"""

@deprecated(None, "This class is equivalent as tf.keras.layers.LSTMCell,"
" and will be replaced by that in Tensorflow 2.0.")
def __init__(self, num_units,
use_peepholes=False, cell_clip=None,
initializer=None, num_proj=None, proj_clip=None,
num_unit_shards=None, num_proj_shards=None,
forget_bias=1.0, state_is_tuple=True,
activation=None, reuse=None, name=None, dtype=None, **kwargs):
"""Initialize the parameters for an LSTM cell.
Args:
num_units: int, The number of units in the LSTM cell.
use_peepholes: bool, set True to enable diagonal/peephole connections.
cell_clip: (optional) A float value, if provided the cell state is clipped
by this value prior to the cell output activation.
initializer: (optional) The initializer to use for the weight and
projection matrices.
num_proj: (optional) int, The output dimensionality for the projection
matrices. If None, no projection is performed.
proj_clip: (optional) A float value. If `num_proj > 0` and `proj_clip` is
provided, then the projected values are clipped elementwise to within
`[-proj_clip, proj_clip]`.
num_unit_shards: Deprecated, will be removed by Jan. 2017.
Use a variable_scope partitioner instead.
num_proj_shards: Deprecated, will be removed by Jan. 2017.
Use a variable_scope partitioner instead.
forget_bias: Biases of the forget gate are initialized by default to 1
in order to reduce the scale of forgetting at the beginning of
the training. Must set it manually to `0.0` when restoring from
CudnnLSTM trained checkpoints.
state_is_tuple: If True, accepted and returned states are 2-tuples of
the `c_state` and `m_state`. If False, they are concatenated
along the column axis. This latter behavior will soon be deprecated.
activation: Activation function of the inner states. Default: `tanh`. It
could also be string that is within Keras activation function names.
reuse: (optional) Python boolean describing whether to reuse variables
in an existing scope. If not `True`, and the existing scope already has
the given variables, an error is raised.
name: String, the name of the layer. Layers with the same name will
share weights, but to avoid mistakes we require reuse=True in such
cases.
dtype: Default dtype of the layer (default of `None` means use the type
of the first input). Required when `build` is called before `call`.
**kwargs: Dict, keyword named properties for common layer attributes, like
`trainable` etc when constructing the cell from configs of get_config().
When restoring from CudnnLSTM-trained checkpoints, use
`CudnnCompatibleLSTMCell` instead.
"""
super(LSTMCell, self).__init__(
_reuse=reuse, name=name, dtype=dtype, **kwargs)
if not state_is_tuple:
logging.warn("%s: Using a concatenated state is slower and will soon be "
"deprecated. Use state_is_tuple=True.", self)
if num_unit_shards is not None or num_proj_shards is not None:
logging.warn(
"%s: The num_unit_shards and proj_unit_shards parameters are "
"deprecated and will be removed in Jan 2017. "
"Use a variable scope with a partitioner instead.", self)
if context.executing_eagerly() and context.num_gpus() > 0:
logging.warn("%s: Note that this cell is not optimized for performance. "
"Please use tf.contrib.cudnn_rnn.CudnnLSTM for better "
"performance on GPU.", self)

# Inputs must be 2-dimensional.
self.input_spec = input_spec.InputSpec(ndim=2)

self._num_units = num_units
self._use_peepholes = use_peepholes
self._cell_clip = cell_clip
self._initializer = initializers.get(initializer)
self._num_proj = num_proj
self._proj_clip = proj_clip
self._num_unit_shards = num_unit_shards
self._num_proj_shards = num_proj_shards
self._forget_bias = forget_bias
self._state_is_tuple = state_is_tuple
if activation:
self._activation = activations.get(activation)
else:
self._activation = math_ops.tanh

if num_proj:
self._state_size = (
LSTMStateTuple(num_units, num_proj)
if state_is_tuple else num_units + num_proj)
self._output_size = num_proj
else:
self._state_size = (
LSTMStateTuple(num_units, num_units)
if state_is_tuple else 2 * num_units)
self._output_size = num_units

@property
def state_size(self):
return self._state_size

@property
def output_size(self):
return self._output_size

@tf_utils.shape_type_conversion
def build(self, inputs_shape):
if inputs_shape[-1] is None:
raise ValueError("Expected inputs.shape[-1] to be known, saw shape: %s"
% str(inputs_shape))

input_depth = inputs_shape[-1]
h_depth = self._num_units if self._num_proj is None else self._num_proj
maybe_partitioner = (
partitioned_variables.fixed_size_partitioner(self._num_unit_shards)
if self._num_unit_shards is not None
else None)
self._kernel = self.add_variable(
_WEIGHTS_VARIABLE_NAME,
shape=[input_depth + h_depth, 4 * self._num_units],
initializer=self._initializer,
partitioner=maybe_partitioner)
if self.dtype is None:
initializer = init_ops.zeros_initializer
else:
initializer = init_ops.zeros_initializer(dtype=self.dtype)
self._bias = self.add_variable(
_BIAS_VARIABLE_NAME,
shape=[4 * self._num_units],
initializer=initializer)
if self._use_peepholes:
self._w_f_diag = self.add_variable("w_f_diag", shape=[self._num_units],
initializer=self._initializer)
self._w_i_diag = self.add_variable("w_i_diag", shape=[self._num_units],
initializer=self._initializer)
self._w_o_diag = self.add_variable("w_o_diag", shape=[self._num_units],
initializer=self._initializer)

if self._num_proj is not None:
maybe_proj_partitioner = (
partitioned_variables.fixed_size_partitioner(self._num_proj_shards)
if self._num_proj_shards is not None
else None)
self._proj_kernel = self.add_variable(
"projection/%s" % _WEIGHTS_VARIABLE_NAME,
shape=[self._num_units, self._num_proj],
initializer=self._initializer,
partitioner=maybe_proj_partitioner)

self.built = True

def call(self, inputs, state):
"""Run one step of LSTM.
Args:
inputs: input Tensor, must be 2-D, `[batch, input_size]`.
state: if `state_is_tuple` is False, this must be a state Tensor,
`2-D, [batch, state_size]`. If `state_is_tuple` is True, this must be a
tuple of state Tensors, both `2-D`, with column sizes `c_state` and
`m_state`.
Returns:
A tuple containing:
- A `2-D, [batch, output_dim]`, Tensor representing the output of the
LSTM after reading `inputs` when previous state was `state`.
Here output_dim is:
num_proj if num_proj was set,
num_units otherwise.
- Tensor(s) representing the new state of LSTM after reading `inputs` when
the previous state was `state`. Same type and shape(s) as `state`.
Raises:
ValueError: If input size cannot be inferred from inputs via
static shape inference.
"""
num_proj = self._num_units if self._num_proj is None else self._num_proj
sigmoid = math_ops.sigmoid

if self._state_is_tuple:
(c_prev, m_prev) = state
else:
c_prev = array_ops.slice(state, [0, 0], [-1, self._num_units])
m_prev = array_ops.slice(state, [0, self._num_units], [-1, num_proj])

input_size = inputs.get_shape().with_rank(2).dims[1].value
if input_size is None:
raise ValueError("Could not infer input size from inputs.get_shape()[-1]")

# i = input_gate, j = new_input, f = forget_gate, o = output_gate
lstm_matrix = math_ops.matmul(
array_ops.concat([inputs, m_prev], 1), self._kernel)
lstm_matrix = nn_ops.bias_add(lstm_matrix, self._bias)

i, j, f, o = array_ops.split(
value=lstm_matrix, num_or_size_splits=4, axis=1)
# Diagonal connections
if self._use_peepholes:
c = (sigmoid(f + self._forget_bias + self._w_f_diag * c_prev) * c_prev +
sigmoid(i + self._w_i_diag * c_prev) * self._activation(j))
else:
c = (sigmoid(f + self._forget_bias) * c_prev + sigmoid(i) *
self._activation(j))

if self._cell_clip is not None:
# pylint: disable=invalid-unary-operand-type
c = clip_ops.clip_by_value(c, -self._cell_clip, self._cell_clip)
# pylint: enable=invalid-unary-operand-type
if self._use_peepholes:
m = sigmoid(o + self._w_o_diag * c) * self._activation(c)
else:
m = sigmoid(o) * self._activation(c)

if self._num_proj is not None:
m = math_ops.matmul(m, self._proj_kernel)

if self._proj_clip is not None:
# pylint: disable=invalid-unary-operand-type
m = clip_ops.clip_by_value(m, -self._proj_clip, self._proj_clip)
# pylint: enable=invalid-unary-operand-type

new_state = (LSTMStateTuple(c, m) if self._state_is_tuple else
array_ops.concat([c, m], 1))
return m, new_state

def get_config(self):
config = {
"num_units": self._num_units,
"use_peepholes": self._use_peepholes,
"cell_clip": self._cell_clip,
"initializer": initializers.serialize(self._initializer),
"num_proj": self._num_proj,
"proj_clip": self._proj_clip,
"num_unit_shards": self._num_unit_shards,
"num_proj_shards": self._num_proj_shards,
"forget_bias": self._forget_bias,
"state_is_tuple": self._state_is_tuple,
"activation": activations.serialize(self._activation),
"reuse": self._reuse,
}
base_config = super(LSTMCell, self).get_config()
return dict(list(base_config.items()) + list(config.items()))


def _enumerated_map_structure_up_to(shallow_structure, map_fn, *args, **kwargs):
ix = [0]
def enumerated_fn(*inner_args, **inner_kwargs):
r = map_fn(ix[0], *inner_args, **inner_kwargs)
ix[0] += 1
return r
return nest.map_structure_up_to(shallow_structure,
enumerated_fn, *args, **kwargs)


def _default_dropout_state_filter_visitor(substate):
if isinstance(substate, LSTMStateTuple):
# Do not perform dropout on the memory state.
return LSTMStateTuple(c=False, h=True)
elif isinstance(substate, tensor_array_ops.TensorArray):
return False
return True

与 BasicLSTMCell 区别
The class uses optional peep-hole connections, optional cell clipping, and
an optional projection layer.

其中一个流形的 LSTM 变体,就是由 Gers & Schmidhuber (2000) 提出的,增加了 “peephole connection”。是说,我们让 门层 也会接受细胞状态的输入。

使用 peephole connection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
if self._use_peepholes:
self._w_f_diag = self.add_variable("w_f_diag", shape=[self._num_units],
initializer=self._initializer)
self._w_i_diag = self.add_variable("w_i_diag", shape=[self._num_units],
initializer=self._initializer)
self._w_o_diag = self.add_variable("w_o_diag", shape=[self._num_units],
initializer=self._initializer)

if self._use_peepholes:
c = (sigmoid(f + self._forget_bias + self._w_f_diag * c_prev) * c_prev +
sigmoid(i + self._w_i_diag * c_prev) * self._activation(j))
else:
c = (sigmoid(f + self._forget_bias) * c_prev + sigmoid(i) *
self._activation(j))

if self._use_peepholes:
m = sigmoid(o + self._w_o_diag * c) * self._activation(c)
else:
m = sigmoid(o) * self._activation(c)

class MultiRNNCell(RNNCell)

1
2
3
4
5
6
7
8
@tf_export(v1=["nn.rnn_cell.MultiRNNCell"])
class MultiRNNCell(RNNCell):
"""RNN cell composed sequentially of multiple simple cells.
Example:
```python
num_units = [128, 64]
cells = [BasicLSTMCell(num_units=n) for n in num_units]
stacked_rnn_cell = MultiRNNCell(cells)
"""

@deprecated(None, "This class is equivalent as "
                  "tf.keras.layers.StackedRNNCells, and will be replaced by "
                  "that in Tensorflow 2.0.")
def __init__(self, cells, state_is_tuple=True):
    """Create a RNN cell composed sequentially of a number of RNNCells.
    Args:
      cells: list of RNNCells that will be composed in this order.
      state_is_tuple: If True, accepted and returned states are n-tuples, where
        `n = len(cells)`.  If False, the states are all
        concatenated along the column axis.  This latter behavior will soon be
        deprecated.
    Raises:
      ValueError: if cells is empty (not allowed), or at least one of the cells
        returns a state tuple but the flag `state_is_tuple` is `False`.
    """
    super(MultiRNNCell, self).__init__()
    if not cells:
        raise ValueError("Must specify at least one cell for MultiRNNCell.")
    if not nest.is_sequence(cells):
        raise TypeError(
            "cells must be a list or tuple, but saw: %s." % cells)

    if len(set([id(cell) for cell in cells])) < len(cells):
        logging.log_first_n(logging.WARN,
                            "At least two cells provided to MultiRNNCell "
                            "are the same object and will share weights.", 1)

    self._cells = cells
    for cell_number, cell in enumerate(self._cells):
        # Add Checkpointable dependencies on these cells so their variables get
        # saved with this object when using object-based saving.
        if isinstance(cell, checkpointable.CheckpointableBase):
            # TODO(allenl): Track down non-Checkpointable callers.
            self._track_checkpointable(cell, name="cell-%d" % (cell_number,))
    self._state_is_tuple = state_is_tuple
    if not state_is_tuple:
        if any(nest.is_sequence(c.state_size) for c in self._cells):
            raise ValueError("Some cells return tuples of states, but the flag "
                             "state_is_tuple is not set.  State sizes are: %s"
                             % str([c.state_size for c in self._cells]))

@property
def state_size(self):
    if self._state_is_tuple:
        return tuple(cell.state_size for cell in self._cells)
    else:
        return sum(cell.state_size for cell in self._cells)

@property
def output_size(self):
    return self._cells[-1].output_size

def zero_state(self, batch_size, dtype):
    with ops.name_scope(type(self).__name__ + "ZeroState", values=[batch_size]):
        if self._state_is_tuple:
            return tuple(cell.zero_state(batch_size, dtype) for cell in self._cells)
        else:
            # We know here that state_size of each cell is not a tuple and
            # presumably does not contain TensorArrays or anything else fancy
            return super(MultiRNNCell, self).zero_state(batch_size, dtype)

@property
def trainable_weights(self):
    if not self.trainable:
        return []
    weights = []
    for cell in self._cells:
        if isinstance(cell, base_layer.Layer):
            weights += cell.trainable_weights
    return weights

@property
def non_trainable_weights(self):
    weights = []
    for cell in self._cells:
        if isinstance(cell, base_layer.Layer):
            weights += cell.non_trainable_weights
    if not self.trainable:
        trainable_weights = []
        for cell in self._cells:
            if isinstance(cell, base_layer.Layer):
                trainable_weights += cell.trainable_weights
        return trainable_weights + weights
    return weights

def call(self, inputs, state):
    """Run this multi-layer cell on inputs, starting from state."""
    cur_state_pos = 0
    cur_inp = inputs
    new_states = []
    for i, cell in enumerate(self._cells):
        with vs.variable_scope("cell_%d" % i):
            if self._state_is_tuple:
                if not nest.is_sequence(state):
                    raise ValueError(
                        "Expected state to be a tuple of length %d, but received: %s" %
                        (len(self.state_size), state))
                cur_state = state[i]
            else:
                cur_state = array_ops.slice(state, [0, cur_state_pos],
                                            [-1, cell.state_size])
                cur_state_pos += cell.state_size
            cur_inp, new_state = cell(cur_inp, cur_state)
            new_states.append(new_state)

    new_states = (tuple(new_states) if self._state_is_tuple else
                  array_ops.concat(new_states, 1))

    return cur_inp, new_states

```

本站所有文章和源码均免费开放,如您喜欢,可以请我喝杯咖啡