# SPDX-License-Identifier: Apache-2.0 import json import urllib3 class WazuhClient: def __init__(self, ip, port, username, password, logger): self._ip = ip self._port = port self._username = username self._password = password self._auth_token = None self.logger = logger def req(self, method, resource, data=None, headers={}, auth_retry=True): # TODO: add cert verification c = urllib3.HTTPSConnectionPool(self._ip, port=self._port, cert_reqs='CERT_NONE', assert_hostname=False) url = "https://%s:%i/%s" % (self._ip, self._port, resource) headers['Content-Type'] = 'application/json' if self._auth_token: headers['Authorization'] = 'Bearer %s' % self._auth_token try: resp = c.request(method, url, headers=headers, body=data) except (TimeoutError, urllib3.exceptions.NewConnectionError, urllib3.exceptions.MaxRetryError) as err: self.logger.exception("Wazuh manager not available") else: if resp.status == 401: if not auth_retry: raise Exception("Authentication Error") self._auth_token = None self._login() return self.req(method, resource, data, headers, auth_retry=False) return json.loads(resp.data) def _login(self): login_endpoint = 'security/user/authenticate' basic_auth = "%s:%s" % (self._username, self._password) resp = self.req('GET', login_endpoint, headers=urllib3.make_headers(basic_auth=basic_auth), auth_retry=False) self._auth_token = resp['data']['token']