-
matevzerzen authoredmatevzerzen authored
wazuh_client.py 1.44 KiB
# SPDX-License-Identifier: Apache-2.0
import json
import urllib3
class WazuhClient:
def __init__(self, ip, port, username, password, logger):
self._ip = ip
self._port = port
self._username = username
self._password = password
self._auth_token = None
self.logger = logger
def req(self, method, resource, data=None, headers={}, auth_retry=True):
# TODO: add cert verification
c = urllib3.HTTPSConnectionPool(self._ip, port=self._port, cert_reqs='CERT_NONE', assert_hostname=False)
url = "https://%s:%i/%s" % (self._ip, self._port, resource)
headers['Content-Type'] = 'application/json'
if self._auth_token:
headers['Authorization'] = 'Bearer %s' % self._auth_token
try:
resp = c.request(method, url, headers=headers, body=data)
except (TimeoutError, urllib3.exceptions.NewConnectionError,
urllib3.exceptions.MaxRetryError) as err:
self.logger.exception("Wazuh manager not available")
else:
if resp.status == 401:
if not auth_retry:
raise Exception("Authentication Error")
self._auth_token = None
self._login()
return self.req(method, resource, data, headers, auth_retry=False)
return json.loads(resp.data)
def _login(self):
login_endpoint = 'security/user/authenticate'
basic_auth = "%s:%s" % (self._username, self._password)
resp = self.req('GET', login_endpoint, headers=urllib3.make_headers(basic_auth=basic_auth), auth_retry=False)
self._auth_token = resp['data']['token']