Module reclab.recommenders.cfnade.cfnade_lib.nade

Expand source code
import tensorflow as tf
from keras.engine import Layer, InputSpec
from keras import backend as K
from keras import initializers
from keras import regularizers
from keras import constraints

# def dot_product(x, kernel):
#     """
#     Wrapper for dot product operation, in order to be compatible with both
#     Theano and Tensorflow
#     Args:
#         x (): input
#         kernel (): weights
#     Returns:
#     """
#     return K.squeeze(K.dot(x, K.expand_dims(kernel)), axis=-1)


class NADE(Layer):
    def __init__(self,
                 hidden_dim,
                 activation,
                 W_regularizer=None,
                 V_regularizer=None,
                 b_regularizer=None,
                 c_regularizer=None,
                 bias=False, 
                 normalized_layer=False,
                 **kwargs):

        self.init = initializers.get('uniform')

        self.bias = bias
        self.activation = activation
        self.hidden_dim = hidden_dim

        self.W_regularizer = regularizers.get(W_regularizer)
        self.V_regularizer = regularizers.get(V_regularizer)
        self.b_regularizer = regularizers.get(b_regularizer)
        self.c_regularizer = regularizers.get(c_regularizer)
        
        self.normalized_layer = normalized_layer

        super(NADE, self).__init__(**kwargs)

    def build(self, input_shape):
        self.input_dim1 = input_shape[1]
        self.input_dim2 = input_shape[2]

        self.W = self.add_weight(
            shape=(self.input_dim1, self.input_dim2, self.hidden_dim),
            initializer=self.init,
            name='{}_W'.format(self.name),
            regularizer=self.W_regularizer)
        if self.bias:
            self.c = self.add_weight(
                shape=(self.hidden_dim, ),
                initializer=self.init,
                name='{}_c'.format(self.name),
                regularizer=self.c_regularizer)

        if self.bias:
            self.b = self.add_weight(
                shape=(self.input_dim1, self.input_dim2),
                initializer=self.init,
                name='{}_b'.format(self.name),
                regularizer=self.b_regularizer)

        self.V = self.add_weight(
            shape=(self.hidden_dim, self.input_dim1, self.input_dim2),
            initializer=self.init,
            name='{}_V'.format(self.name),
            regularizer=self.V_regularizer)

        super().build(input_shape)

    def call(self, original_x):

        x = K.cumsum(original_x[:, :, ::-1], axis=2)[:, :, ::-1]
        # x.shape = (?,6040,5)
        # W.shape = (6040, 5, 500)
        # c.shape = (500,)
        output_ = tf.tensordot(x, self.W, axes=[[1, 2], [0, 1]])
       
        if self.normalized_layer:
            output_ /= tf.matmul(
                tf.maximum(
                    tf.reshape(
                        tf.reduce_sum(
                            tf.reduce_sum(original_x, axis=2), axis=1),
                        [-1, 1]), 1), tf.ones([1, output_.shape[1]]))

        if self.bias:
            output_ = output_ + self.c

        h_out = tf.reshape(output_, [-1, self.hidden_dim])
        #tf.cast(indices, tf.float32)
        # output_.shape = (?,500)

        h_out_act = K.tanh(h_out)
        # h_out_act.shape = (?,500)
        # V.shape = (500, 6040, 5)
        # b.shape = (6040,5)
        if self.bias:
            output = tf.tensordot(h_out_act, self.V, axes=[[1], [0]]) + self.b
        else:
            output = tf.tensordot(h_out_act, self.V, axes=[[1], [0]])
        # output.shape = (?,6040,5)
        output = tf.reshape(output, [-1, self.input_dim1, self.input_dim2])
        return output

    def compute_output_shape(self, input_shape):
        return (input_shape[0], input_shape[1], input_shape[2])

Classes

class NADE (hidden_dim, activation, W_regularizer=None, V_regularizer=None, b_regularizer=None, c_regularizer=None, bias=False, normalized_layer=False, **kwargs)

Abstract base layer class.

Properties

input, output: Input/output tensor(s). Note that if the layer
    is used more than once (shared layer), this is ill-defined
    and will raise an exception. In such cases, use
    <code>layer.get\_input\_at(node\_index)</code>.
input_mask, output_mask: Mask tensors. Same caveats apply as
    input, output.
input_shape: Shape tuple. Provided for convenience, but note
    that there may be cases in which this attribute is
    ill-defined (e.g. a shared layer with multiple input
    shapes), in which case requesting <code>input\_shape</code> will raise
    an Exception. Prefer using
    <code>layer.get\_input\_shape\_at(node\_index)</code>.
input_spec: List of InputSpec class instances
    each entry describes one required input:
        - ndim
        - dtype
    A layer with <code>n</code> input tensors must have
    an <code>input\_spec</code> of length <code>n</code>.
name: String, must be unique within a model.
non_trainable_weights: List of variables.
output_shape: Shape tuple. See <code>input\_shape</code>.
stateful: Boolean indicating whether the layer carries
    additional non-weight state. Used in, for instance, RNN
    cells to carry information between batches.
supports_masking: Boolean indicator of whether the layer
    supports masking, typically for unused timesteps in a
    sequence.
trainable: Boolean, whether the layer weights
    will be updated during training.
trainable_weights: List of variables.
uses_learning_phase: Whether any operation
    of the layer uses <code>K.in\_training\_phase()</code>
    or <code>K.in\_test\_phase()</code>.
weights: The concatenation of the lists trainable_weights and
    non_trainable_weights (in this order).
dtype:  Default dtype of the layers's weights.

Methods

call(x, mask=None): Where the layer's logic lives.
__call__(x, mask=None): Wrapper around the layer logic (<code>call</code>).
    If x is a Keras tensor:
        - Connect current layer with last layer from tensor:
            <code>self.\_add\_inbound\_node(last\_layer)</code>
        - Add layer to tensor history
    If layer is not built:
        - Build from x._keras_shape
compute_mask(x, mask)
compute_output_shape(input_shape)
count_params()
get_config()
get_input_at(node_index)
get_input_mask_at(node_index)
get_input_shape_at(node_index)
get_output_at(node_index)
get_output_mask_at(node_index)
get_output_shape_at(node_index)
get_weights()
set_weights(weights)

Class Methods

from_config(config)

Internal methods:

_add_inbound_node(layer, index=0)
assert_input_compatibility()
build(input_shape)
Expand source code
class NADE(Layer):
    def __init__(self,
                 hidden_dim,
                 activation,
                 W_regularizer=None,
                 V_regularizer=None,
                 b_regularizer=None,
                 c_regularizer=None,
                 bias=False, 
                 normalized_layer=False,
                 **kwargs):

        self.init = initializers.get('uniform')

        self.bias = bias
        self.activation = activation
        self.hidden_dim = hidden_dim

        self.W_regularizer = regularizers.get(W_regularizer)
        self.V_regularizer = regularizers.get(V_regularizer)
        self.b_regularizer = regularizers.get(b_regularizer)
        self.c_regularizer = regularizers.get(c_regularizer)
        
        self.normalized_layer = normalized_layer

        super(NADE, self).__init__(**kwargs)

    def build(self, input_shape):
        self.input_dim1 = input_shape[1]
        self.input_dim2 = input_shape[2]

        self.W = self.add_weight(
            shape=(self.input_dim1, self.input_dim2, self.hidden_dim),
            initializer=self.init,
            name='{}_W'.format(self.name),
            regularizer=self.W_regularizer)
        if self.bias:
            self.c = self.add_weight(
                shape=(self.hidden_dim, ),
                initializer=self.init,
                name='{}_c'.format(self.name),
                regularizer=self.c_regularizer)

        if self.bias:
            self.b = self.add_weight(
                shape=(self.input_dim1, self.input_dim2),
                initializer=self.init,
                name='{}_b'.format(self.name),
                regularizer=self.b_regularizer)

        self.V = self.add_weight(
            shape=(self.hidden_dim, self.input_dim1, self.input_dim2),
            initializer=self.init,
            name='{}_V'.format(self.name),
            regularizer=self.V_regularizer)

        super().build(input_shape)

    def call(self, original_x):

        x = K.cumsum(original_x[:, :, ::-1], axis=2)[:, :, ::-1]
        # x.shape = (?,6040,5)
        # W.shape = (6040, 5, 500)
        # c.shape = (500,)
        output_ = tf.tensordot(x, self.W, axes=[[1, 2], [0, 1]])
       
        if self.normalized_layer:
            output_ /= tf.matmul(
                tf.maximum(
                    tf.reshape(
                        tf.reduce_sum(
                            tf.reduce_sum(original_x, axis=2), axis=1),
                        [-1, 1]), 1), tf.ones([1, output_.shape[1]]))

        if self.bias:
            output_ = output_ + self.c

        h_out = tf.reshape(output_, [-1, self.hidden_dim])
        #tf.cast(indices, tf.float32)
        # output_.shape = (?,500)

        h_out_act = K.tanh(h_out)
        # h_out_act.shape = (?,500)
        # V.shape = (500, 6040, 5)
        # b.shape = (6040,5)
        if self.bias:
            output = tf.tensordot(h_out_act, self.V, axes=[[1], [0]]) + self.b
        else:
            output = tf.tensordot(h_out_act, self.V, axes=[[1], [0]])
        # output.shape = (?,6040,5)
        output = tf.reshape(output, [-1, self.input_dim1, self.input_dim2])
        return output

    def compute_output_shape(self, input_shape):
        return (input_shape[0], input_shape[1], input_shape[2])

Ancestors

  • keras.engine.base_layer.Layer

Methods

def build(self, input_shape)

Creates the layer weights.

Must be implemented on all layers that have weights.

Arguments

input_shape: Keras tensor (future input to layer)
    or list/tuple of Keras tensors to reference
    for weight shape computations.
Expand source code
def build(self, input_shape):
    self.input_dim1 = input_shape[1]
    self.input_dim2 = input_shape[2]

    self.W = self.add_weight(
        shape=(self.input_dim1, self.input_dim2, self.hidden_dim),
        initializer=self.init,
        name='{}_W'.format(self.name),
        regularizer=self.W_regularizer)
    if self.bias:
        self.c = self.add_weight(
            shape=(self.hidden_dim, ),
            initializer=self.init,
            name='{}_c'.format(self.name),
            regularizer=self.c_regularizer)

    if self.bias:
        self.b = self.add_weight(
            shape=(self.input_dim1, self.input_dim2),
            initializer=self.init,
            name='{}_b'.format(self.name),
            regularizer=self.b_regularizer)

    self.V = self.add_weight(
        shape=(self.hidden_dim, self.input_dim1, self.input_dim2),
        initializer=self.init,
        name='{}_V'.format(self.name),
        regularizer=self.V_regularizer)

    super().build(input_shape)
def call(self, original_x)

This is where the layer's logic lives.

Arguments

inputs: Input tensor, or list/tuple of input tensors.
**kwargs: Additional keyword arguments.

Returns

A tensor or list/tuple of tensors.
Expand source code
def call(self, original_x):

    x = K.cumsum(original_x[:, :, ::-1], axis=2)[:, :, ::-1]
    # x.shape = (?,6040,5)
    # W.shape = (6040, 5, 500)
    # c.shape = (500,)
    output_ = tf.tensordot(x, self.W, axes=[[1, 2], [0, 1]])
   
    if self.normalized_layer:
        output_ /= tf.matmul(
            tf.maximum(
                tf.reshape(
                    tf.reduce_sum(
                        tf.reduce_sum(original_x, axis=2), axis=1),
                    [-1, 1]), 1), tf.ones([1, output_.shape[1]]))

    if self.bias:
        output_ = output_ + self.c

    h_out = tf.reshape(output_, [-1, self.hidden_dim])
    #tf.cast(indices, tf.float32)
    # output_.shape = (?,500)

    h_out_act = K.tanh(h_out)
    # h_out_act.shape = (?,500)
    # V.shape = (500, 6040, 5)
    # b.shape = (6040,5)
    if self.bias:
        output = tf.tensordot(h_out_act, self.V, axes=[[1], [0]]) + self.b
    else:
        output = tf.tensordot(h_out_act, self.V, axes=[[1], [0]])
    # output.shape = (?,6040,5)
    output = tf.reshape(output, [-1, self.input_dim1, self.input_dim2])
    return output
def compute_output_shape(self, input_shape)

Computes the output shape of the layer.

Assumes that the layer will be built to match that input shape provided.

Arguments

input_shape: Shape tuple (tuple of integers)
    or list of shape tuples (one per output tensor of the layer).
    Shape tuples can include None for free dimensions,
    instead of an integer.

Returns

An output shape tuple.
Expand source code
def compute_output_shape(self, input_shape):
    return (input_shape[0], input_shape[1], input_shape[2])