support zabbix 6.4
vasya3793 opened this issue · 0 comments
-
for information
i found that the pyzabbix project is like your py-zabbix project
creates a pyzabbix library and works with it.
when installing both, api.py is overwritten -
I propose to modify to support zabbix 6.4
- remove the login from the init function, make it in the login procedure
I attach the source:
-- encoding: utf-8 --
Copyright © 2014 Alexey Dubkov
This file is part of py-zabbix.
Py-zabbix is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Py-zabbix is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with py-zabbix. If not, see http://www.gnu.org/licenses/.
import json
import logging
import os
import ssl
import sys
import base64
from packaging.version import Version
For Python 2 and 3 compatibility
try:
import urllib2
except ImportError:
# Since Python 3, urllib2.Request and urlopen were moved to
# the urllib.request.
import urllib.request as urllib2
from .version import version
from .logger import NullHandler, HideSensitiveFilter, HideSensitiveService
null_handler = NullHandler()
logger = logging.getLogger(name)
logger.addHandler(null_handler)
logger.addFilter(HideSensitiveFilter())
class ZabbixAPIException(Exception):
"""ZabbixAPI exception class.
:code list:
:32602: Invalid params (eg already exists)
:32500: No permissions
"""
def __init__(self, *args):
super(Exception, self).__init__(*args)
if len(args) == 1 and isinstance(args[0], dict):
self.error = args[0]
self.error['json'] = HideSensitiveService.hide_sensitive(self.error['json'])
self.message = self.error['message']
self.code = self.error['code']
self.data = self.error['data']
self.json = self.error['json']
class ZabbixAPIObjectClass(object):
"""ZabbixAPI Object class.
:type group: str
:param group: Zabbix API method group name.
Example: `apiinfo.version` method it will be `apiinfo`.
:type parent: :class:`zabbix.api.ZabbixAPI` object
:param parent: ZabbixAPI object to use as parent.
"""
def __init__(self, group, parent):
self.group = group
self.parent = parent
def __getattr__(self, name):
"""Dynamically create a method.
:type name: str
:param name: Zabbix API method name.
Example: `apiinfo.version` method it will be `version`.
"""
def fn(*args, **kwargs):
if args and kwargs:
raise TypeError("Found both args and kwargs")
method = '{0}.{1}'.format(self.group, name)
logger.debug("Call %s method", method)
return self.parent.do_request(
method,
args or kwargs
)['result']
return fn
def ssl_context_compat(func):
def inner(req):
# We should explicitly disable cert verification to support
# self-signed certs with urllib2 since Python 2.7.9 and 3.4.3
default_version = (2, 7, 9)
version = {
2: default_version,
3: (3, 4, 3),
}
python_version = sys.version_info[0]
minimum_version = version.get(python_version, default_version)
if sys.version_info[0:3] >= minimum_version:
# Create default context to skip SSL cert verification.
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
res = func(req, context=ctx)
else:
res = func(req)
return res
return inner
@ssl_context_compat
def urlopen(*args, **kwargs):
return urllib2.urlopen(*args, **kwargs)
class ZabbixAPI(object):
"""ZabbixAPI class, implement interface to zabbix api.
:type url: str
:param url: URL to zabbix api. Default: `ZABBIX_URL` or
`https://localhost/zabbix`
:type use_authenticate: bool
:param use_authenticate: Use `user.authenticate` method if `True` else
`user.login`.
:type use_basic_auth: bool
:param use_basic_auth: Using basic auth if `True`
:type user: str
:param user: Zabbix user name. Default: `ZABBIX_USER` or `'Admin'`.
:type password: str
:param password: Zabbix user password. Default `ZABBIX_PASSWORD` or
`zabbix`.
>>> from pyzabbix import ZabbixAPI
>>> z = ZabbixAPI('https://zabbix.server', user='Admin', password='zabbix')
>>> # Get API Version
>>> z.api_info.version()
>>> u'2.2.1'
>>> # or
>>> z.do_request('apiinfo.version')
>>> {u'jsonrpc': u'2.0', u'result': u'2.2.1', u'id': u'1'}
>>> # Get all disabled hosts
>>> z.host.get(status=1)
>>> # or
>>> z.do_request('host.getobjects', {'status': 1})
"""
def __init__(self, url=None, use_authenticate=False, use_basic_auth=False, user=None,
password=None):
url = url or os.environ.get('ZABBIX_URL') or 'https://localhost/zabbix'
user = user or os.environ.get('ZABBIX_USER') or 'Admin'
password = password or os.environ.get('ZABBIX_PASSWORD') or 'zabbix'
self.use_authenticate = use_authenticate
self.use_basic_auth = use_basic_auth
self.auth = None
self.url = url + '/api_jsonrpc.php'
self.base64_cred = self.cred_to_base64(user, password) if self.use_basic_auth else None
if user != None and password != None:
self._login(user, password)
logger.debug("JSON-PRC Server: %s", self.url)
def login(self, user='', password=''):
self._login(user, password)
def __getattr__(self, name):
"""Dynamically create an object class (ie: host).
:type name: str
:param name: Zabbix API method group name.
Example: `apiinfo.version` method it will be `apiinfo`.
"""
return ZabbixAPIObjectClass(name, self)
def _login(self, user='', password=''):
"""Do login to zabbix server.
:type user: str
:param user: Zabbix user
:type password: str
:param password: Zabbix user password
"""
logger.debug("ZabbixAPI.login({0},{1})".format(user, HideSensitiveService.HIDEMASK))
if self._detect_version:
self.version = Version(self.api_version())
logger.info(f"Zabbix API version is: {self.version}")
self.auth = None
if self.use_authenticate:
self.auth = self.user.authenticate(user=user, password=password)
else:
self.auth = self.user.login(user=user, password=password)
if self.use_authenticate:
self.auth = self.user.authenticate(user=user, password=password)
elif self.version and self.version >= Version("5.4.0"):
self.auth = self.user.login(username=user, password=password)
else:
self.auth = self.user.login(user=user, password=password)
def _logout(self):
"""Do logout from zabbix server."""
if self.auth:
logger.debug("ZabbixAPI.logout()")
if self.user.logout():
self.auth = None
def __enter__(self):
return self
def __exit__(self, *args):
self._logout()
@staticmethod
def cred_to_base64(user, password):
"""Create header for basic authorization
:type user: str
:param user: Zabbix user
:type password: str
:param password: Zabbix user password
:return: str
"""
base64string = base64.b64encode('{}:{}'.format(user, password).encode())
return base64string.decode()
def api_version(self):
"""Return version of server Zabbix API.
:rtype: str
:return: Version of server Zabbix API.
"""
return self.apiinfo.version()
def do_request(self, method, params=None):
"""Make request to Zabbix API.
:type method: str
:param method: ZabbixAPI method, like: `apiinfo.version`.
:type params: str
:param params: ZabbixAPI method arguments.
>>> from pyzabbix import ZabbixAPI
>>> z = ZabbixAPI()
>>> apiinfo = z.do_request('apiinfo.version')
"""
request_json = {
'jsonrpc': '2.0',
'method': method,
'params': params or {},
'id': '1',
}
# apiinfo.version and user.login doesn't require auth token
if self.auth and (method not in ('apiinfo.version', 'user.login')):
request_json['auth'] = self.auth
logger.debug(
'urllib2.Request({0}, {1})'.format(
self.url,
json.dumps(request_json)))
data = json.dumps(request_json)
if not isinstance(data, bytes):
data = data.encode("utf-8")
req = urllib2.Request(self.url, data)
req.get_method = lambda: 'POST'
req.add_header('Content-Type', 'application/json-rpc')
req.add_header('User-Agent', 'py-zabbix/{}'.format(__version__))
if self.use_basic_auth:
req.add_header("Authorization", "Basic {}".format(self.base64_cred))
try:
res = urlopen(req)
res_str = res.read().decode('utf-8')
res_json = json.loads(res_str)
except ValueError as e:
raise ZabbixAPIException("Unable to parse json: %s" % e.message)
res_str = json.dumps(res_json, indent=4, separators=(',', ': '))
logger.debug("Response Body: %s", res_str)
if 'error' in res_json:
err = res_json['error'].copy()
err.update({'json': str(request_json)})
raise ZabbixAPIException(err)
return res_json
def get_id(self, item_type, item=None, with_id=False, hostid=None, **args):
"""Return id or ids of zabbix objects.
:type item_type: str
:param item_type: Type of zabbix object. (eg host, item etc.)
:type item: str
:param item: Name of zabbix object. If it is `None`, return list of
all objects in the scope.
:type with_id: bool
:param with_id: Returned values will be in zabbix json `id` format.
Examlpe: `{'itemid: 128}`
:type name: bool
:param name: Return name instead of id.
:type hostid: int
:param hostid: Filter objects by specific hostid.
:type templateids: int
:param tempateids: Filter objects which only belong to specific
templates by template id.
:type app_name: str
:param app_name: Filter object which only belong to specific
application.
:rtype: int or list
:return: Return single `id`, `name` or list of values.
"""
result = None
name = args.get('name', False)
type_ = '{item_type}.get'.format(item_type=item_type)
item_filter_name = {
'mediatype': 'description',
'trigger': 'description',
'triggerprototype': 'description',
'user': 'alias',
'usermacro': 'macro',
}
item_id_name = {
'discoveryrule': 'item',
'graphprototype': 'graph',
'hostgroup': 'group',
'itemprototype': 'item',
'map': 'selement',
'triggerprototype': 'trigger',
'usergroup': 'usrgrp',
'usermacro': 'hostmacro',
}
filter_ = {
'filter': {
item_filter_name.get(item_type, 'name'): item,
},
'output': 'extend'}
if hostid:
filter_['filter'].update({'hostid': hostid})
if args.get('templateids'):
if item_type == 'usermacro':
filter_['hostids'] = args['templateids']
else:
filter_['templateids'] = args['templateids']
if args.get('app_name'):
filter_['application'] = args['app_name']
logger.debug(
'do_request( "{type}", {filter} )'.format(
type=type_,
filter=filter_))
response = self.do_request(type_, filter_)['result']
if response:
item_id_str = item_id_name.get(item_type, item_type)
item_id = '{item}id'.format(item=item_id_str)
result = []
for obj in response:
# Check if object not belong current template
if args.get('templateids'):
if (not obj.get('templateid') in ("0", None) or
not len(obj.get('templateids', [])) == 0):
continue
if name:
o = obj.get(item_filter_name.get(item_type, 'name'))
result.append(o)
elif with_id:
result.append({item_id: int(obj.get(item_id))})
else:
result.append(int(obj.get(item_id)))
list_types = (list, type(None))
if not isinstance(item, list_types):
result = result[0]
return result