Skip to content
Snippets Groups Projects
Commit a0fe1815 authored by Matevz Erzen's avatar Matevz Erzen
Browse files

VirusTotal and last scan checks

parent 2917af9e
No related branches found
No related tags found
No related merge requests found
......@@ -13,8 +13,9 @@ class Evidence:
self.measurement_result = measurement_result
self.body = body
def get_json(self):
def toJson(self):
return json.dumps(self.__dict__)
def simple_evidence(evidence_id, timestamp, measurement_result, body):
return Evidence(evidence_id, timestamp, None, None, None, None, None, measurement_result, body)
\ No newline at end of file
......@@ -2,11 +2,19 @@ from wazuhclient import WazuhClient
from evidence import Evidence, simple_evidence
from random import randint
from sys import maxsize
import json
import pprint
from datetime import datetime
wc = WazuhClient('192.168.33.10', 55000, 'wazuh-wui', 'wazuh-wui')
# Get (temporary) ID
def get_id(reqId):
return reqId + '-' + str(randint(0, maxsize))
# Get timestamp (can be changed according to our preferences)
def get_timestamp():
ts = datetime.utcnow()
return ts.strftime('%Y-%m-%dT%H:%M:%SZ')
# Get list of all agent ids (including manager's)
def get_agents(wc):
......@@ -18,27 +26,28 @@ def get_agents(wc):
return body, agents_ids
# Check if syscheck enabled
def get_syscheck(wc, agent_id):
def check_syscheck(wc, agent_id):
body = wc.req('GET', 'agents/' + agent_id + '/config/syscheck/syscheck')
measurement_result = ('true' if body['data']['syscheck']['disabled'] == 'no' else 'false')
return body, measurement_result
evidence = simple_evidence(get_id('05.3'), get_timestamp(), measurement_result, body)
return evidence
# Check if rootcheck enabled
def get_rootcheck(wc, agent_id):
def check_rootcheck(wc, agent_id):
body = wc.req('GET', 'agents/' + agent_id + '/config/syscheck/rootcheck')
measurement_result = ('true' if body['data']['rootcheck']['disabled'] == 'no' else 'false')
return body, measurement_result
evidence = simple_evidence(get_id('05.3'), get_timestamp(), measurement_result, body)
return evidence
# Check if there's at least one valid alerting service
def get_alert_integrations(wc):
def check_alert_integrations(wc):
body = wc.req('GET', 'manager/configuration')
# Check email notifications integration
......@@ -64,7 +73,43 @@ def get_alert_integrations(wc):
measurement_result = ('true' if email_notifications or slack_notifications or pagerduty_notifications else 'false')
return body, measurement_result
evidence = simple_evidence(get_id('05.3'), get_timestamp(), measurement_result, body)
return evidence
# Check for VirusTotal integration
def check_virus_total_integration(wc):
body = wc.req('GET', 'manager/configuration')
# Check VirusTotal integration
try:
integrations = body['data']['affected_items'][0]['integration']
measurement_result = 'false'
for integration in integrations:
if integration['name'] == 'virustotal':
measurement_result = 'true'
break
except:
measurement_result = 'false'
evidence = simple_evidence(get_id('05.3'), get_timestamp(), measurement_result, body)
return evidence
# Check last Syscheck & Rootcheck scan times
def check_last_scan_time(wc, agent_id):
body = wc.req('GET', 'syscheck/' + agent_id + '/last_scan')
measurement_result = body['data']['affected_items'][0]['end']
evidence1 = simple_evidence(get_id('05.4'), get_timestamp(), measurement_result, body)
body = wc.req('GET', 'rootcheck/' + agent_id + '/last_scan')
measurement_result = body['data']['affected_items'][0]['end']
evidence2 = simple_evidence(get_id('05.4'), get_timestamp(), measurement_result, body)
#pprint.pprint(wc.req('GET', 'sca/000'))
\ No newline at end of file
return evidence1, evidence2
import json
import urllib3
class WazuhClient:
def __init__(self, ip, port, username, password):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment