Skip to content
Snippets Groups Projects
Select Git revision
  • 530ddad1b8d287c3d019f663337bd3944e13e9ee
  • main default
2 results

wazuh_client.py

Blame
  • wazuh_client.py 1.41 KiB
    import json
    import urllib3
    
    class WazuhClient:
    	
    	def __init__(self, ip, port, username, password, logger):
    		self._ip = ip
    		self._port = port
    		self._username = username
    		self._password = password
    		self._auth_token = None
    		self.logger = logger
    	
    	def req(self, method, resource, data=None, headers={}, auth_retry=True):
    		# TODO: add cert verification
    		c = urllib3.HTTPSConnectionPool(self._ip, port=self._port, cert_reqs='CERT_NONE', assert_hostname=False)
    		url = "https://%s:%i/%s" % (self._ip, self._port, resource)
    		
    		headers['Content-Type'] = 'application/json'
    		if self._auth_token:
    			headers['Authorization'] = 'Bearer %s' % self._auth_token
    		
    		try:
    			resp = c.request(method, url, headers=headers, body=data)
    		except (TimeoutError, urllib3.exceptions.NewConnectionError, 
            		urllib3.exceptions.MaxRetryError) as err:
    			self.logger.error(err)
    			self.logger.error("Wazuh manager not available")
    
    		if resp.status == 401:
    			if not auth_retry:
    				raise Exception("Authentication Error")
    			self._auth_token = None
    			self._login()
    			return self.req(method, resource, data, headers, auth_retry=False)
    		
    		return json.loads(resp.data)
    	
    	def _login(self):
    		login_endpoint = 'security/user/authenticate'
    		basic_auth = "%s:%s" % (self._username, self._password)
    		resp = self.req('GET', login_endpoint, headers=urllib3.make_headers(basic_auth=basic_auth), auth_retry=False)
    		self._auth_token = resp['data']['token']