Skip to content
Snippets Groups Projects
Select Git revision
  • main default
1 result

verifier.py

Blame
  • verifier.py 3.63 KiB
    from wazuhclient import WazuhClient
    from evidence import Evidence, simple_evidence
    from random import randint
    from sys import maxsize
    from datetime import datetime
    
    wc = WazuhClient('192.168.33.10', 55000, 'wazuh-wui', 'wazuh-wui')
    
    # Get (temporary) ID
    def get_id(reqId):
        return reqId + '-' + str(randint(0, maxsize))
    
    # Get timestamp (can be changed according to our preferences)
    def get_timestamp():
        ts = datetime.utcnow()
    
        return ts.strftime('%Y-%m-%dT%H:%M:%SZ')
    
    # Get list of all agent ids (including manager's)
    def get_agents(wc):
        body = wc.req('GET', 'agents')
        
        agents_ids = []
        for agent in body['data']['affected_items']:
            agents_ids.append(agent['id'])
    
        return body, agents_ids
    
    # Check if syscheck enabled
    def check_syscheck(wc, agent_id):
        body = wc.req('GET', 'agents/' + agent_id + '/config/syscheck/syscheck')
    
        measurement_result = ('true' if body['data']['syscheck']['disabled'] == 'no' else 'false')
    
        evidence = simple_evidence(get_id('05.3'), get_timestamp(), measurement_result, body)
    
        return evidence
    
    # Check if rootcheck enabled
    def check_rootcheck(wc, agent_id):
        body = wc.req('GET', 'agents/' + agent_id + '/config/syscheck/rootcheck')
    
        measurement_result = ('true' if body['data']['rootcheck']['disabled'] == 'no' else 'false')
    
        evidence = simple_evidence(get_id('05.3'), get_timestamp(), measurement_result, body)
    
        return evidence
    
    # Check if there's at least one valid alerting service
    def check_alert_integrations(wc):
        body = wc.req('GET', 'manager/configuration')
    
        # Check email notifications integration
        try:
            email_notifications = (True if body['data']['affected_items'][0]['global']['email_notification'] == 'yes' else False)
        except:
            email_notifications = False
    
        # Check Slack and PagerDuty notifications integration
        try:
            integrations = body['data']['affected_items'][0]['integration']
    
            slack_notifications = pagerduty_notifications = False
            
            for integration in integrations:
                if integration['name'] == 'slack':
                    slack_notifications = True
    
                if integration['name'] == 'pagerduty':
                    pagerduty_notifications = True
        except:
            slack_notifications = pagerduty_notifications = False
    
        measurement_result = ('true' if email_notifications or slack_notifications or pagerduty_notifications else 'false')
    
        evidence = simple_evidence(get_id('05.3'), get_timestamp(), measurement_result, body)
    
        return evidence
    
    # Check for VirusTotal integration
    def check_virus_total_integration(wc):
        body = wc.req('GET', 'manager/configuration')
    
         # Check VirusTotal integration
        try:
            integrations = body['data']['affected_items'][0]['integration']
    
            measurement_result = 'false'
            
            for integration in integrations:
                if integration['name'] == 'virustotal':
                    measurement_result = 'true'
                    break
        except:
            measurement_result = 'false'
    
        evidence = simple_evidence(get_id('05.3'), get_timestamp(), measurement_result, body)
    
        return evidence
    
    # Check last Syscheck & Rootcheck scan times
    def check_last_scan_time(wc, agent_id):
        body = wc.req('GET', 'syscheck/' + agent_id + '/last_scan')
    
        measurement_result = body['data']['affected_items'][0]['end']
    
        evidence1 = simple_evidence(get_id('05.4'), get_timestamp(), measurement_result, body)
    
        body = wc.req('GET', 'rootcheck/' + agent_id + '/last_scan')
    
        measurement_result = body['data']['affected_items'][0]['end']
    
        evidence2 = simple_evidence(get_id('05.4'), get_timestamp(), measurement_result, body)
    
        return evidence1, evidence2