本产品推出了新版本。
简体中文版经机器翻译而成,仅供参考。如与英语版出现任何冲突,应以英语版为准。
通用支持模块
贡献者
建议更改
所有 Python 脚本都在一个模块中使用一个通用 Python 类。
#!/usr/bin/env python
##--------------------------------------------------------------------
#
# File: deploy_requests.py
#
# (C) Copyright 2019 NetApp, Inc.
#
# This sample code is provided AS IS, with no support or warranties of
# any kind, including but not limited for warranties of merchantability
# or fitness of any kind, expressed or implied. Permission to use,
# reproduce, modify and create derivatives of the sample code is granted
# solely for the purpose of researching, designing, developing and
# testing a software application product for use with NetApp products,
# provided that the above copyright notice appears in all copies and
# that the software application product is distributed pursuant to terms
# no less restrictive than those set forth herein.
#
##--------------------------------------------------------------------
import json
import logging
import requests
requests.packages.urllib3.disable_warnings()
class DeployRequests(object):
'''
Wrapper class for requests that simplifies the ONTAP Select Deploy
path creation and header manipulations for simpler code.
'''
def __init__(self, ip, admin_password):
self.base_url = 'https://{}/api'.format(ip)
self.auth = ('admin', admin_password)
self.headers = {'Accept': 'application/json'}
self.logger = logging.getLogger('deploy')
def post(self, path, data, files=None, wait_for_job=False):
if files:
self.logger.debug('POST FILES:')
response = requests.post(self.base_url + path,
auth=self.auth, verify=False,
files=files)
else:
self.logger.debug('POST DATA: %s', data)
response = requests.post(self.base_url + path,
auth=self.auth, verify=False,
json=data,
headers=self.headers)
self.logger.debug('HEADERS: %s\nBODY: %s', self.filter_headers(response), response.text)
self.exit_on_errors(response)
if wait_for_job and response.status_code == 202:
self.wait_for_job(response.json())
return response
def patch(self, path, data, wait_for_job=False):
self.logger.debug('PATCH DATA: %s', data)
response = requests.patch(self.base_url + path,
auth=self.auth, verify=False,
json=data,
headers=self.headers)
self.logger.debug('HEADERS: %s\nBODY: %s', self.filter_headers(response), response.text)
self.exit_on_errors(response)
if wait_for_job and response.status_code == 202:
self.wait_for_job(response.json())
return response
def put(self, path, data, files=None, wait_for_job=False):
if files:
print('PUT FILES: {}'.format(data))
response = requests.put(self.base_url + path,
auth=self.auth, verify=False,
data=data,
files=files)
else:
self.logger.debug('PUT DATA:')
response = requests.put(self.base_url + path,
auth=self.auth, verify=False,
json=data,
headers=self.headers)
self.logger.debug('HEADERS: %s\nBODY: %s', self.filter_headers(response), response.text)
self.exit_on_errors(response)
if wait_for_job and response.status_code == 202:
self.wait_for_job(response.json())
return response
def get(self, path):
""" Get a resource object from the specified path """
response = requests.get(self.base_url + path, auth=self.auth, verify=False)
self.logger.debug('HEADERS: %s\nBODY: %s', self.filter_headers(response), response.text)
self.exit_on_errors(response)
return response
def delete(self, path, wait_for_job=False):
""" Delete's a resource from the specified path """
response = requests.delete(self.base_url + path, auth=self.auth, verify=False)
self.logger.debug('HEADERS: %s\nBODY: %s', self.filter_headers(response), response.text)
self.exit_on_errors(response)
if wait_for_job and response.status_code == 202:
self.wait_for_job(response.json())
return response
def find_resource(self, path, name, value):
''' Returns the 'id' of the resource if it exists, otherwise None '''
resource = None
response = self.get('{path}?{field}={value}'.format(
path=path, field=name, value=value))
if response.status_code == 200 and response.json().get('num_records') >= 1:
resource = response.json().get('records')[0].get('id')
return resource
def get_num_records(self, path, query=None):
''' Returns the number of records found in a container, or None on error '''
resource = None
query_opt = '?{}'.format(query) if query else ''
response = self.get('{path}{query}'.format(path=path, query=query_opt))
if response.status_code == 200 :
return response.json().get('num_records')
return None
def resource_exists(self, path, name, value):
return self.find_resource(path, name, value) is not None
def wait_for_job(self, response, poll_timeout=120):
last_modified = response['job']['last_modified']
job_id = response['job']['id']
self.logger.info('Event: ' + response['job']['message'])
while True:
response = self.get('/jobs/{}?fields=state,message&'
'poll_timeout={}&last_modified=>={}'.format(
job_id, poll_timeout, last_modified))
job_body = response.json().get('record', {})
# Show interesting message updates
message = job_body.get('message', '')
self.logger.info('Event: ' + message)
# Refresh the last modified time for the poll loop
last_modified = job_body.get('last_modified')
# Look for the final states
state = job_body.get('state', 'unknown')
if state in ['success', 'failure']:
if state == 'failure':
self.logger.error('FAILED background job.\nJOB: %s', job_body)
exit(1) # End the script if a failure occurs
break
def exit_on_errors(self, response):
if response.status_code >= 400:
self.logger.error('FAILED request to URL: %s\nHEADERS: %s\nRESPONSE BODY: %s',
response.request.url,
self.filter_headers(response),
response.text)
response.raise_for_status() # Displays the response error, and exits the script
@staticmethod
def filter_headers(response):
''' Returns a filtered set of the response headers '''
return {key: response.headers[key] for key in ['Location', 'request-id'] if key in response.headers}