Skip to content
Snippets Groups Projects
wazuh_client.py 1.44 KiB
# SPDX-License-Identifier: Apache-2.0

import json
import urllib3

class WazuhClient:
	
	def __init__(self, ip, port, username, password, logger):
		self._ip = ip
		self._port = port
		self._username = username
		self._password = password
		self._auth_token = None
		self.logger = logger
	
	def req(self, method, resource, data=None, headers={}, auth_retry=True):
		# TODO: add cert verification
		c = urllib3.HTTPSConnectionPool(self._ip, port=self._port, cert_reqs='CERT_NONE', assert_hostname=False)
		url = "https://%s:%i/%s" % (self._ip, self._port, resource)
		
		headers['Content-Type'] = 'application/json'
		if self._auth_token:
			headers['Authorization'] = 'Bearer %s' % self._auth_token
		
		try:
			resp = c.request(method, url, headers=headers, body=data)
		except (TimeoutError, urllib3.exceptions.NewConnectionError, 
        		urllib3.exceptions.MaxRetryError) as err:
			self.logger.exception("Wazuh manager not available")
		else:
			if resp.status == 401:
				if not auth_retry:
					raise Exception("Authentication Error")
				self._auth_token = None
				self._login()
				return self.req(method, resource, data, headers, auth_retry=False)

			return json.loads(resp.data)
	
	def _login(self):
		login_endpoint = 'security/user/authenticate'
		basic_auth = "%s:%s" % (self._username, self._password)
		resp = self.req('GET', login_endpoint, headers=urllib3.make_headers(basic_auth=basic_auth), auth_retry=False)
		self._auth_token = resp['data']['token']