Skip to content

Add support for AlternativeName and better support for CustomAttribute #632

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 210 additions & 0 deletions kmip/core/attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1568,3 +1568,213 @@ def __str__(self):
'salt': self.salt,
'iteration_count': self.iteration_count
})


class AlternativeName(primitives.Struct):
"""
The AlternativeName attribute is used to identify and locate the object.

This attribute is assigned by the client, and the Alternative Name Value
is intended to be in a form that humans are able to interpret. The key
management system MAY specify rules by which the client creates valid
alternative names. Clients are informed of such rules by a mechanism that
is not specified by this standard. Alternative Names MAY NOT be unique
within a given key management server.

Attributes:
alternative_name_value: text string
alternative_name_type: enumeration

See Section 4.2 of the KMIP v2.1 specification for more information.
"""

def __init__(
self,
alternative_name_value=None,
alternative_name_type=None
):
"""
Construct an AlternativeName object.

Args:
alternative_name_value (string): Optional, defaults to None.
Required for read/write.
alternative_name_type (enumeration): Optional, defaults to None.
Required for read/write.
"""
super(AlternativeName, self).__init__(
enums.Tags.ALTERNATIVE_NAME
)

self._alternative_name_value = None
self._alternative_name_type = None

self.alternative_name_value = alternative_name_value
self.alternative_name_type = alternative_name_type

@property
def alternative_name_value(self):
if self._alternative_name_value:
return self._alternative_name_value.value
return None

@alternative_name_value.setter
def alternative_name_value(self, value):
if value is None:
self._alternative_name_value = None
elif isinstance(value, six.string_types):
self._alternative_name_value = primitives.TextString(
value=value,
tag=enums.Tags.ALTERNATIVE_NAME_VALUE
)
else:
raise TypeError("The alternative name value must be a string.")

@property
def alternative_name_type(self):
if self._alternative_name_type:
return self._alternative_name_type.value
return None

@alternative_name_type.setter
def alternative_name_type(self, value):
if value is None:
self._alternative_name_type = None
elif isinstance(value, enums.AlternativeNameType):
self._alternative_name_type = primitives.Enumeration(
enum=enums.AlternativeNameType,
tag=enums.Tags.ALTERNATIVE_NAME_TYPE,
value=value
)
else:
raise TypeError("The alternative name type must be an int.")

def read(self, input_buffer, kmip_version=enums.KMIPVersion.KMIP_1_0):
"""
Read the data encoding the AlternativeName attribute
and decode it.

Args:
input_buffer (stream): A data stream containing encoded object
data, supporting a read method; usually a BytearrayStream
object.
kmip_version (KMIPVersion): An enumeration defining the KMIP
version with which the object will be decoded. Optional,
defaults to KMIP 1.0.
"""
super(AlternativeName, self).read(
input_buffer,
kmip_version=kmip_version
)
local_buffer = utils.BytearrayStream(input_buffer.read(self.length))

if self.is_tag_next(enums.Tags.ALTERNATIVE_NAME_VALUE, local_buffer):
self._alternative_name_value = primitives.TextString(
tag=enums.Tags.ALTERNATIVE_NAME_VALUE
)
self._alternative_name_value.read(
local_buffer,
kmip_version=kmip_version
)
else:
raise exceptions.InvalidKmipEncoding(
"The AlternativeName encoding is missing the "
"AlternativeNameValue field."
)
if self.is_tag_next(enums.Tags.ALTERNATIVE_NAME_TYPE, local_buffer):
self._alternative_name_type = primitives.Enumeration(
enum=enums.AlternativeNameType,
tag=enums.Tags.ALTERNATIVE_NAME_TYPE
)
self._alternative_name_type.read(
local_buffer,
kmip_version=kmip_version
)
else:
raise exceptions.InvalidKmipEncoding(
"The AlternativeName encoding is missing the "
"AlternativeNameType field."
)

self.is_oversized(local_buffer)

def write(self, output_buffer, kmip_version=enums.KMIPVersion.KMIP_1_0):
"""
Write the data encoding the AlternativeName object to a
buffer.

Args:
output_buffer (stream): A data stream in which to encode object
data, supporting a write method; usually a BytearrayStream
object.
kmip_version (KMIPVersion): An enumeration defining the KMIP
version with which the object will be encoded. Optional,
defaults to KMIP 1.0.
"""
local_buffer = utils.BytearrayStream()

if self._alternative_name_value:
self._alternative_name_value.write(
local_buffer,
kmip_version=kmip_version
)
else:
raise exceptions.InvalidField(
"The AlternativeName object is missing the "
"AlternativeNameValue field."
)
if self._alternative_name_type:
self._alternative_name_type.write(
local_buffer,
kmip_version=kmip_version
)
else:
raise exceptions.InvalidField(
"The AlternativeName object is missing the "
"AlternativeNameType field."
)

self.length = local_buffer.length()
super(AlternativeName, self).write(
output_buffer,
kmip_version=kmip_version
)
output_buffer.write(local_buffer.buffer)

def __repr__(self):
args = [
"alternative_name_value={}".format(
repr(self.alternative_name_value)
),
"alternative_name_type={}".format(repr(self.alternative_name_type))
]
return "AlternativeName({})".format(", ".join(args))

def __str__(self):
value = ", ".join(
[
'"alternative_name_value": "{}"'.format(
self.alternative_name_value
),
'"alternative_name_type": "{}"'.format(
self.alternative_name_type
)
]
)
return "{" + value + "}"

def __eq__(self, other):
if isinstance(other, AlternativeName):
if self.alternative_name_value != other.alternative_name_value:
return False
if self.alternative_name_type != other.alternative_name_type:
return False
return True
else:
return NotImplemented

def __ne__(self, other):
if isinstance(other, AlternativeName):
return not self.__eq__(other)
else:
return NotImplemented
13 changes: 13 additions & 0 deletions kmip/core/factories/attribute_values.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ def create_attribute_value(self, name, value):
return primitives.Boolean(value, enums.Tags.SENSITIVE)
elif name is enums.AttributeType.CUSTOM_ATTRIBUTE:
return attributes.CustomAttribute(value)
elif name is enums.AttributeType.ALTERNATIVE_NAME:
return self._create_alternative_name(value)
else:
if not isinstance(name, str):
raise ValueError('Unrecognized attribute type: '
Expand Down Expand Up @@ -200,6 +202,8 @@ def create_attribute_value_by_enum(self, enum, value):
return primitives.Boolean(value, enums.Tags.SENSITIVE)
elif enum is enums.Tags.CUSTOM_ATTRIBUTE:
return attributes.CustomAttribute(value)
elif enum is enums.Tags.ALTERNATIVE_NAME:
return self._create_alternative_name(value)
else:
raise ValueError("Unrecognized attribute type: {}".format(enum))

Expand Down Expand Up @@ -272,6 +276,15 @@ def _create_cryptographic_usage_mask(self, flags):

return attributes.CryptographicUsageMask(mask)

def _create_alternative_name(self, info):
if info:
return attributes.AlternativeName(
alternative_name_value=info.get('alternative_name_value'),
alternative_name_type=info.get('alternative_name_type')
)
else:
return attributes.AlternativeName()

def _create_application_specific_information(self, info):
if info:
return attributes.ApplicationSpecificInformation(
Expand Down
2 changes: 1 addition & 1 deletion kmip/demos/pie/locate.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
for initial_date in initial_dates:
try:
t = time.strptime(initial_date)
except ValueError, TypeError:
except (ValueError, TypeError):
logger.error(
"Invalid initial date provided: {}".format(initial_date)
)
Expand Down
17 changes: 17 additions & 0 deletions kmip/pie/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,23 @@ class ManagedObject(sql.Base):
order_by="ManagedObjectName.id"
)
names = association_proxy('_names', 'name')

alternative_names = sqlalchemy.orm.relationship(
"ManagedObjectAlternativeName",
back_populates="mo",
cascade="all, delete-orphan",
order_by="ManagedObjectAlternativeName.id",
passive_deletes=True
)

custom_attributes = sqlalchemy.orm.relationship(
"ManagedObjectCustomAttribute",
back_populates="mo",
cascade="all, delete-orphan",
order_by="ManagedObjectCustomAttribute.id",
passive_deletes=True
)

operation_policy_name = Column(
'operation_policy_name',
String(50),
Expand Down
76 changes: 75 additions & 1 deletion kmip/pie/sqltypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ def process_result_value(self, value, dialect):


class ManagedObjectName(Base):

__tablename__ = 'managed_object_names'
id = Column('id', Integer, primary_key=True)
mo_uid = Column('mo_uid', Integer, ForeignKey('managed_objects.uid'))
Expand Down Expand Up @@ -167,3 +166,78 @@ def __ne__(self, other):
return not (self == other)
else:
return NotImplemented


class ManagedObjectAlternativeName(Base):
__tablename__ = 'managed_object_alternative_names'
id = Column('id', Integer, primary_key=True)
mo_uid = Column('mo_uid', Integer, ForeignKey('managed_objects.uid'))
alternative_name_value = Column('alternative_name_value', String)
alternative_name_type = Column(
'alternative_name_type',
EnumType(enums.AlternativeNameType)
)

mo = relationship('ManagedObject', back_populates='alternative_names')

def __init__(self, alternative_name_value, alternative_name_type):
self.alternative_name_value = alternative_name_value
self.alternative_name_type = alternative_name_type

def __repr__(self):
return ("<ManagedObjectAlternativeName(name='%s', type='%s')>" %
(self.alternative_name_value, self.alternative_name_type))

def __eq__(self, other):
if isinstance(other, ManagedObjectAlternativeName):
if self.alternative_name_value != other.alternative_name_value:
return False
elif self.alternative_name_type != other.alternative_name_type:
return False
else:
return True
else:
return NotImplemented

def __ne__(self, other):
if isinstance(other, ManagedObjectAlternativeName):
return not (self == other)
else:
return NotImplemented


class ManagedObjectCustomAttribute(Base):
__tablename__ = 'managed_object_custom_attributes'
id = Column('id', Integer, primary_key=True)
mo_uid = Column('mo_uid', Integer, ForeignKey('managed_objects.uid'))
attribute_name = Column('attribute_name', String)
attribute_text = Column('attribute_text', String)

mo = relationship('ManagedObject', back_populates='custom_attributes')

def __init__(self, attribute_name, attribute_text):
self.attribute_name = attribute_name
self.attribute_text = attribute_text

def __repr__(self):
return (
"<ManagedObjectCustomAttribute(alternative_name='%s', \
alternative_name_type='%s')>" % (self.attribute_name, self.attribute_text)
)

def __eq__(self, other):
if isinstance(other, ManagedObjectCustomAttribute):
if self.attribute_name != other.attribute_name:
return False
elif self.attribute_text != other.attribute_text:
return False
else:
return True
else:
return NotImplemented

def __ne__(self, other):
if isinstance(other, ManagedObjectCustomAttribute):
return not (self == other)
else:
return NotImplemented
Loading