#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (C) Copyright 2017-2020 Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Module with HPE XP disk array wrapper."""
import logging
import warnings
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from hpestorapi.base import BaseDevice, AuthError, ParameterError
if __name__ == "__main__":
pass
logging.getLogger('hpestorapi.xp').addHandler(logging.NullHandler())
LOG = logging.getLogger('hpestorapi.xp')
class ConfManager(BaseDevice):
"""Base class for all Configuration Manager objects."""
def __init__(self, address, port=None, ssl=True):
"""Initialize Configuration manager object."""
super().__init__()
self.cvae_addr = address
self.cvae_port = port
self.cvae_ssl = ssl
self._headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'
}
@property
def _base_url(self) -> str:
"""
Generate static part of URL.
:rtype: str
:return: Static part of URL
"""
proto = 'https' if self.cvae_ssl else 'http'
if self.cvae_port is None:
port = 25451 if self.cvae_ssl else 25450
else:
port = self.cvae_port
return f'{proto}://{self.cvae_addr}:{port}/ConfigurationManager'
def __str__(self):
class_name = self.__class__.__name__
return f'<class hpestorapi.{class_name}(address={self.cvae_addr})>'
def _query(self, url, method, **kwargs):
# Copy allowed args to options dict
options = dict()
allowed = ['params',
'json',
'headers',
'auth',
'delay',
'verify',
'cert']
for key in kwargs:
if key in allowed:
options[key] = kwargs[key]
# Add standart HTTP and auth headers to parameter list
if kwargs.get('headers') is not None:
options['headers'].update(self._headers)
else:
options['headers'] = self._headers
# By default SSL cert checking is disabled
certcheck = kwargs.get('verify', False)
# Set HTTP delay (if not set by user)
timeout = kwargs.get('delay', self.timeout)
# Prepare request
path = f'{self._base_url}/{url}'
req = requests.Request(method, path, **options)
prep = req.prepare()
LOG.debug('%s(`%s`)', method, path)
# Perform request with runtime measuring
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=InsecureRequestWarning)
try:
session = requests.Session()
resp = session.send(prep, verify=certcheck, timeout=timeout)
deltafmt = '%d.%d sec' % (resp.elapsed.seconds,
resp.elapsed.microseconds // 1000)
except Exception as error:
LOG.fatal('Cannot connect to Configuration Manager. %s',
error)
raise error
# Check Rest service response
if resp.status_code != requests.codes.ok:
LOG.warning('Return code %s, response delay %s',
resp.status_code,
deltafmt)
LOG.warning('resp.content=%s', resp.content)
LOG.warning('resp.reason=%s', resp.reason)
else:
LOG.debug('Rest server return status %s, delay %s',
resp.status_code,
deltafmt)
# Check JSON string and return response
try:
jdata = resp.json()
except ValueError:
if resp.content:
LOG.warning('Cannot decode JSON. Source string: %s',
resp.content)
return resp.status_code, None
return resp.status_code, jdata # success = True, data = json
class CommandViewAE(ConfManager):
"""Command View Advanced Edition object."""
def device_reg(self, svp, serialnum, username, password, gen='XP7'):
"""Register new XP storage array."""
LOG.debug('Trying to register new storage device on Configuration '
'manager. Serial Number:%s, Generation=%s.',
serialnum,
gen)
# Perform device registration
param = {
'svpIp': svp,
'serialNumber': serialnum,
'model': gen}
status, _ = self._query(
'v1/objects/storages',
method='POST',
json=param,
auth=(username, password))
if status == requests.codes.ok:
LOG.info('Storage device succefully registered in '
'Configuration manager.')
else:
LOG.warning('Storage device registration failure. Serial '
'Number:%s, Generation=%s.',
serialnum,
gen)
return status
def device_unreg(self, serialnum, username, password):
"""Remove XP storage array registration."""
LOG.debug('Trying to remove storage device registration on '
'Configuration manager. Serial Number:%s.',
serialnum)
array = self.device_find(serialnum)
if array is None:
LOG.info('Cannot remove storage device registration from '
'Configuration Manager database. Storage system not '
'found. Serial Number:%s',
serialnum)
else:
path = 'v1/objects/storages/%s' % array['storageDeviceId']
status, _ = self._query(path,
method='DELETE',
auth=(username, password))
if status == requests.codes.ok:
LOG.info('Storage system registration sucessfully removed '
'from Configuration manager database. Serial '
'Number:%s',
serialnum)
return True
LOG.error('Cannot remove storage system registration from '
'Configuration Manager database. Rest API server '
'return code:%s. Serial Number:%s',
status, serialnum)
return False
def device_find(self, serialnum):
"""Find XP storage array in list of registered."""
status, data = self._query('v1/objects/storages', method='GET')
if status == requests.codes.ok:
for array in data.get('data'):
if str(serialnum) == str(array.get('serialNumber')):
return array
return None
[docs]class Xp(ConfManager):
"""XP7 / P9500 class implementation."""
[docs] def __init__(self, cvae, svp, serialnum, username, password, gen='XP7',
port=23451, ssl=True):
"""
HPE XP constructor.
:param str cvae: Hostname or IP address of HPE Command View AE
Configuration Manager.
:param str svp: Hostname or IP address of Service Processor (SVP).
:param str serialnum: Array serial number (5 or 6 digits)
:param str username: User name for HPE XP disk array. Creating a
dedicated user with limited rights is recommended. Using
"arrayadmin" account is a bad practice.
:param str password: Password for HPE XP disk array.
:param str gen: (optional) Disk array generation. Should be P9500 or
XP7. Default value: XP7.
:param int port: (optional) Command View AE configuration manager
port. Default value: 23451
:param bool ssl: (optional) Use secure https (True) or plain http (
False). Default value: True.
:return: None.
"""
super().__init__(cvae, port, ssl)
self._session = {'id': None, 'token': None}
self._gen = gen
self._svp = svp
self._serialnum = serialnum
self._username = username
self._password = password
def __del__(self):
self.close()
[docs] def open(self):
"""
Open a new Rest API session for a HPE XP array.
Call it prior any other requests. Call :meth:`Xp.close()` if you do not
plan to use session anymore.
:rtype: bool
:return: Return True, if disk array provides a valid session key.
"""
status, data = self.post(
'sessions',
auth=(self._username, self._password)
)
if status == requests.codes.ok:
# Session succefully opened
LOG.info('Access token and session ID succefully received for '
'storage device. Serial Number:%s', self._serialnum)
self._session['id'] = data['sessionId']
self._session['token'] = data['token']
self._headers['Authorization'] = 'Session ' + data['token']
return True
if status == requests.codes.not_found:
# Storage is not registered in Configuration Manager
LOG.info('Storage device is not registered in Configuration '
' manager yet. Lets try to resolve. Serial Number:',
self._serialnum)
if data['messageId'] == 'KART30070-E':
cvae = CommandViewAE(self.cvae_addr, self.cvae_port,
self.cvae_ssl)
status = cvae.device_reg(self._svp,
self._serialnum,
self._username,
self._password,
self._gen)
if status == requests.codes.ok:
return self.open()
elif status == requests.codes.unauthorized:
LOG.fatal('Cannot open Rest API session - wrong user name or '
'password. Serial Number:%s',
self._serialnum)
raise AuthError('''
id: "{id}",
url: "{url}",
message: "{msg}",
cause: "{cause}",
solution: "{sol}"
'''.format(id=data['messageId'],
url=data['errorSource'],
msg=data['message'],
cause=data['cause'],
sol=data['solution'])
)
return False
[docs] def close(self, passive=False):
"""
Close Rest API session.
:param bool passive: (optional) Do not try to close session in the
array. Revoke the client session key after session timed out. Use
this parameter only for method overriding in subclasses. There is no
need of using it, as class implementation manages the session
life cycle. Default value: False.
:return: None
"""
# Only for active session discard
if (self._session.get('id') is not None) and (not passive):
LOG.debug('Lets close Rest API session. '
'Serial Number:%s', self._serialnum)
self.delete('sessions/%s' % self._session['id'])
# Clear session info (for all sessions)
self._session['id'] = self._session['token'] = None
[docs] def get(self, url, **kwargs):
"""
Make a HTTP GET request to HPE XP array. Use this method to get \
information about array objects.
:param str url: URL address. Th static part of the URL address is
generated automatically. Example ofvalid URL: 'pools',
'parity-groups', 'ldevs'. All available URL's and requests result
are described in "HPE XP7 Command View Advanced Edition REST API
Reference Guide".
:param dict params: (optional) Dictionary with query filter
parameters. Described in 'Query parameters' section of "HPE XP7
Command View Advanced Edition REST API Reference Guide".
:param dict json: (optional) A JSON serializable object send in the
request body.
:param float timeout: (optional) Number of seconds that Rest API
client waits for a response from the Rest server before
generating a timeout exception. Default value: :attr:`Xp.timeout`.
:param bool verify: (optional) Either a boolean, controlling
the Rest server's TLS certificate verification, or a string,
where it is a path to a CA bundle. Default value: False (no
certificate verification).
:param str cert: (optional) String with path to ssl client
certificate file (.pem) or tuple pair (‘cert’, ‘key’).
:rtype: (int, {})
:return: Tuple with HTTP status code and dict with request result.
For example: (200, {‘key’:’value’})
"""
return self._query(url, 'GET', **kwargs)
[docs] def post(self, url, **kwargs):
"""
Make a HTTP POST request to HPE XP array. Use this method to create \
a new object.
:param dict json: (optional) A JSON serializable object to send in
the body of request.
:param float timeout: (optional) Number of seconds that Rest API
client waits for a response from the Rest server before
generating a timeout exception. Default value: :attr:`Xp.timeout`.
:param bool verify: (optional) Either a boolean, controlling
the Rest server's TLS certificate verification, or a string,
where it is a path to a CA bundle. Default value: False (no
certificate verification).
:param str cert: (optional) String with path to ssl client
certificate file (.pem) or tuple pair (‘cert’, ‘key’).
:rtype: (int, {})
:return: Tuple with HTTP status code and dict with request result.
For example: (200, {‘key’:’value’})
"""
return self._query(url, 'POST', **kwargs)
[docs] def delete(self, url, **kwargs):
"""
Make a HTTP DELETE request to HPE XP array. Use this method to remove \
objects.
:param dict json: (optional) A JSON serializable object send in the
request body.
:param float timeout: (optional) Number of seconds that Rest API
client waits for a response from the Rest server before
generating a timeout exception. Default value: :attr:`Xp.timeout`.
:param bool verify: (optional) Either a boolean, controlling
the Rest server's TLS certificate verification, or a string,
where it is a path to a CA bundle. Default value: False (no
certificate verification).
:param str cert: (optional) String with path to ssl client
certificate file (.pem) or tuple pair (‘cert’, ‘key’).
:rtype: (int, {})
:return: Tuple with HTTP status code and dict with request result.
For example: (200, {‘key’:’value’})
"""
return self._query(url, 'DELETE', **kwargs)
[docs] def put(self, url, **kwargs):
"""
Perform HTTP PUT request to HPE XP array.
:param dict json: (optional) A JSON serializable object to send in
the body of request.
:param float timeout: (optional) Number of seconds that Rest API
client waits for a response from the Rest server before
generating a timeout exception. Default value: :attr:`Xp.timeout`.
:param bool verify: (optional) Either a boolean, controlling
the Rest server's TLS certificate verification, or a string,
where it is a path to a CA bundle. Default value: False (no
certificate verification).
:param str cert: (optional) String with path to ssl client
certificate file (.pem) or tuple pair (‘cert’, ‘key’).
:rtype: (int, {})
:return: Tuple with HTTP status code and dict with request result.
For example: (200, {‘key’:’value’})
"""
return self._query(url, 'PUT', **kwargs)
def _query(self, url, method, **kwargs):
status, data = ConfManager._query(self, url, method, **kwargs)
# If session was expired
if self._is_expired(status, data):
# Clear old session record
self._headers.pop('Authorization')
# Get new session token
self.close(passive=True)
self.open()
# Replay last request
return ConfManager._query(self, url, method, **kwargs)
return status, data
def _is_expired(self, status, data):
"""Check Rest API session timeout error."""
# Authorization token wasnt received before
if self._headers.get('Authorization') is None:
return False
# If provided token is timed out
if (status == requests.codes.unauthorized) and \
(data.get('messageId') == 'KART40047-E'):
LOG.info('Looks like current access token and session are '
' expired. Session ID:%s, Serial Number:%s',
self._session['id'],
self._serialnum)
return True
return False
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def __str__(self):
class_name = self.__class__.__name__
return f'<class hpestorapi.{class_name}(dev={self._serialnum})>'
@property
def _base_url(self) -> str:
"""
Generate static part of URL.
:rtype: str
:return: Static part of URL
"""
base = super()._base_url
# Generate device id
if self._gen == 'P9500':
dev = '7' + str(self._serialnum).rjust(11, '0')
elif self._gen == 'XP7':
dev = '8' + str(self._serialnum).rjust(11, '0')
else:
LOG.fatal('Unknown array generation. gen="%s"', self._gen)
raise ParameterError(f'Unknown array generation. gen={self._gen}.')
return f'{base}/v1/objects/storages/{dev}'