Skip to content
Snippets Groups Projects
Commit 3384ba8d authored by Matevz Erzen's avatar Matevz Erzen Committed by Matevz Erzen
Browse files

Added ClamAV install verification via Elasticsearch

parent 603b253e
No related branches found
No related tags found
No related merge requests found
# Evidence Collector
This project includes modules for collecting evidence regarding Wazuh and VAT.
## Wazuh evidence collector
Wazuh evidence collector uses [Wazuh's API](https://documentation.wazuh.com/current/user-manual/api/reference.html) to access information about manager's and agents' system informations and configurations. As an additional measure to ensure correct configuration of [ClamAV](https://www.clamav.net/) (if installed on machine) we also make use of [Elasticsearch's API](https://www.elastic.co/guide/en/elasticsearch/reference/current/search.html) to dirrectly access collected logs - Elastic stack is one of the Wazuh's required components (usually installed on the same machine as Wazuh server, but can be stand alone as well).
## Installation & use
1. Set up your Wazuh development environment
2. Clone this repository
3. Install requirements
```
pip install -r requirements.txt
```
4. Run test script
```
python3 test.py
```
### Setting up Wazuh development environment
Use [Security Monitoring](https://gitlab.xlab.si/medina/security-monitoring) repository to create and deploy Vagrant box with all required components.
### API User authentication
Current implementation has disabled SSL certificate verification & uses simple username/password verification. Production version should change this with cert verification.
### Manual Elasticsearch API testin with cURL
Example command for testing the API via CLI:
```
curl --user admin:changeme --insecure -X GET "https://192.168.33.10:9200/wazuh-alerts*/_search?pretty" -H 'Content-Type: application/json' -d'
{"query": {
"bool": {
"must": [{"match": {"predecoder.program_name": "clamd"}},
{"match": {"rule.description": "Clamd restarted"}},
{"match": {"agent.id": "001"}}]
}
}
}'
```
## Known issues
### Python Elasticsearch library problems with ODFE
Latest versions (`7.14.0` & `7.15.0`) of Python Elasticsearch library have problems connecting to Open Distro for Elasticsearch and produce the following error when trying to do so:
```
elasticsearch.exceptions.UnsupportedProductError: The client noticed that the server is not a supported distribution of Elasticsearch
```
To resolve this, downgrade to older package version:
```
pip install 'elasticsearch<7.14.0'
```
elasticsearch_dsl==7.4.0
urllib3==1.25.8
elasticsearch==7.13.4
import pprint
from wazuh_evidence_collector import *
evidences = run_full_check()
for evidence in evidences:
pprint.pprint(evidence.__dict__)
from wazuhclient import WazuhClient
from evidence import Evidence, simple_evidence
from random import randint
from sys import maxsize
from datetime import datetime
import pprint
wc = WazuhClient('192.168.33.10', 55000, 'wazuh-wui', 'wazuh-wui')
# Get (temporary) ID
def get_id(reqId):
return reqId + '-' + str(randint(0, maxsize))
# Get timestamp (can be changed according to our preferences)
def get_timestamp():
ts = datetime.utcnow()
return ts.strftime('%Y-%m-%dT%H:%M:%SZ')
# Get list of all agent ids (including manager's)
def get_agents(wc):
body = wc.req('GET', 'agents')
agents_ids = []
for agent in body['data']['affected_items']:
agents_ids.append(agent['id'])
return body, agents_ids
# Check if syscheck enabled
def check_syscheck(wc, agent_id):
body = wc.req('GET', 'agents/' + agent_id + '/config/syscheck/syscheck')
measurement_result = ('true' if body['data']['syscheck']['disabled'] == 'no' else 'false')
evidence = simple_evidence(get_id('05.3'), get_timestamp(), measurement_result, body)
return evidence
# Check if rootcheck enabled
def check_rootcheck(wc, agent_id):
body = wc.req('GET', 'agents/' + agent_id + '/config/syscheck/rootcheck')
measurement_result = ('true' if body['data']['rootcheck']['disabled'] == 'no' else 'false')
evidence = simple_evidence(get_id('05.3'), get_timestamp(), measurement_result, body)
return evidence
# Check if there's at least one valid alerting service
def check_alert_integrations(wc):
body = wc.req('GET', 'manager/configuration')
# Check email notifications integration
try:
email_notifications = (True if body['data']['affected_items'][0]['global']['email_notification'] == 'yes' else False)
except:
email_notifications = False
# Check Slack and PagerDuty notifications integration
try:
integrations = body['data']['affected_items'][0]['integration']
slack_notifications = pagerduty_notifications = False
for integration in integrations:
if integration['name'] == 'slack':
slack_notifications = True
if integration['name'] == 'pagerduty':
pagerduty_notifications = True
except:
slack_notifications = pagerduty_notifications = False
measurement_result = ('true' if email_notifications or slack_notifications or pagerduty_notifications else 'false')
evidence = simple_evidence(get_id('05.3'), get_timestamp(), measurement_result, body)
return evidence
# Check for VirusTotal integration
def check_virus_total_integration(wc):
body = wc.req('GET', 'manager/configuration')
# Check VirusTotal integration
try:
integrations = body['data']['affected_items'][0]['integration']
measurement_result = 'false'
for integration in integrations:
if integration['name'] == 'virustotal':
measurement_result = 'true'
break
except:
measurement_result = 'false'
evidence = simple_evidence(get_id('05.3'), get_timestamp(), measurement_result, body)
return evidence
# Check last Syscheck & Rootcheck scan times
# When producing 'real' evidence, make sure to provide differentiation between Syscheck and Rootcheck outputs.
def check_last_scan_time(wc, agent_id):
body = wc.req('GET', 'syscheck/' + agent_id + '/last_scan')
measurement_result = body['data']['affected_items'][0]['end']
evidence1 = simple_evidence(get_id('05.4'), get_timestamp(), measurement_result, body)
body = wc.req('GET', 'rootcheck/' + agent_id + '/last_scan')
measurement_result = body['data']['affected_items'][0]['end']
evidence2 = simple_evidence(get_id('05.4'), get_timestamp(), measurement_result, body)
return evidence1, evidence2
# Check if ClamAV daemon package installed
def check_clamd_install(wc, agent_id):
body = wc.req('GET', 'syscollector/' + agent_id + '/packages')
measurement_result = 'false'
for package in body['data']['affected_items']:
if package['name'] == 'clamd':
measurement_result = 'true'
break
evidence = simple_evidence(get_id('05.3'), get_timestamp(), measurement_result, body)
return evidence
# Check if ClamAV daemon process running
def check_clamd_process(wc, agent_id):
body = wc.req('GET', 'syscollector/' + agent_id + '/processes')
measurement_result = 'false'
for package in body['data']['affected_items']:
if package['name'] == 'clamd':
measurement_result = 'true'
break
evidence = simple_evidence(get_id('05.3'), get_timestamp(), measurement_result, body)
return evidence
File moved
from wazuh_client import WazuhClient
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
from evidence import Evidence, simple_evidence
from random import randint
from sys import maxsize
from datetime import datetime
wc = WazuhClient('192.168.33.10', 55000, 'wazuh-wui', 'wazuh-wui')
es = Elasticsearch(
'192.168.33.10',
http_auth=('admin', 'changeme'),
scheme='https',
port=9200,
use_ssl=False,
verify_certs=False,
ssl_show_warn=False,
)
# Get (temporary) ID
def get_id(reqId):
return reqId + '-' + str(randint(0, maxsize))
# Get timestamp (can be changed according to our preferences)
def get_timestamp():
ts = datetime.utcnow()
return ts.strftime('%Y-%m-%dT%H:%M:%SZ')
# Wrapepr function that runs all the checks (for every manager/agent)
def run_full_check():
# Get list of all agent ids (including manager's)
def get_agents(wc):
body = wc.req('GET', 'agents')
agents_ids = []
for agent in body['data']['affected_items']:
agents_ids.append(agent['id'])
return body, agents_ids
body, agents_ids = get_agents(wc)
agent_evidences = []
for agent in agents_ids:
agent_evidences.append(wazuh_monitoring_enabled(wc, agent))
agent_evidences.append(malvare_protection_enabled(wc, es, agent))
return agent_evidences
# Check Wazuh's configuration
def wazuh_monitoring_enabled(wc, agent_id):
# Check if syscheck enabled
def check_syscheck(wc, agent_id):
body = wc.req('GET', 'agents/' + agent_id + '/config/syscheck/syscheck')
measurement_result = body['data']['syscheck']['disabled'] == 'no'
return body, measurement_result
# Check if rootcheck enabled
def check_rootcheck(wc, agent_id):
body = wc.req('GET', 'agents/' + agent_id + '/config/syscheck/rootcheck')
measurement_result = body['data']['rootcheck']['disabled'] == 'no'
return body, measurement_result
# Check if there's at least one valid alerting service
def check_alert_integrations(wc):
body = wc.req('GET', 'manager/configuration')
# Check email notifications integration
try:
email_notifications = (True if body['data']['affected_items'][0]['global']['email_notification'] == 'yes' else False)
except:
email_notifications = False
# Check Slack and PagerDuty notifications integration
try:
integrations = body['data']['affected_items'][0]['integration']
slack_notifications = pagerduty_notifications = False
for integration in integrations:
if integration['name'] == 'slack':
slack_notifications = True
if integration['name'] == 'pagerduty':
pagerduty_notifications = True
except:
slack_notifications = pagerduty_notifications = False
measurement_result = email_notifications or slack_notifications or pagerduty_notifications
return body, measurement_result
raw_evidence = []
evidence, result_syscheck = check_syscheck(wc, agent_id)
raw_evidence.append(evidence)
evidence, result_rootcheck = check_rootcheck(wc, agent_id)
raw_evidence.append(evidence)
evidence, result_aler_integration = check_alert_integrations(wc)
raw_evidence.append(evidence)
if result_syscheck and result_rootcheck and result_aler_integration:
return simple_evidence(get_id('05.3'), get_timestamp(), "true", raw_evidence)
else:
return simple_evidence(get_id('05.3'), get_timestamp(), "false", raw_evidence)
# Check if agent uses ClamAV or VirusTotal
def malvare_protection_enabled(wc, es, agent_id):
# Check for VirusTotal integration
def check_virus_total_integration(wc):
body = wc.req('GET', 'manager/configuration')
# Check VirusTotal integration
try:
integrations = body['data']['affected_items'][0]['integration']
measurement_result = False
for integration in integrations:
if integration['name'] == 'virustotal':
measurement_result = True
break
except:
measurement_result = False
return body, measurement_result
# Check if ClamAV daemon process running
def check_clamd_process(wc, agent_id):
body = wc.req('GET', 'syscollector/' + agent_id + '/processes')
measurement_result = False
for package in body['data']['affected_items']:
if package['name'] == 'clamd':
measurement_result = True
break
return body, measurement_result
# Check ClamAV logs in Elasticsearch
def check_clamd_logs_elastic(es, agent_id):
s = Search(using=es, index="wazuh-alerts-*") \
.query("match", predecoder__program_name="clamd") \
.query("match", rule__description="Clamd restarted") \
.query("match", agent__id=agent_id)
body = s.execute().to_dict()
measurement_result = len(body['hits']['hits']) > 0
return body, measurement_result
raw_evidence = []
evidence, result_virus_total = check_virus_total_integration(wc)
raw_evidence.append(evidence)
evidence, result_lamd_process = check_clamd_process(wc, agent_id)
raw_evidence.append(evidence)
evidence, result_clamd_logs = check_clamd_logs_elastic(es, agent_id)
raw_evidence.append(evidence)
if result_virus_total or (result_lamd_process and result_clamd_logs):
return simple_evidence(get_id('05.4'), get_timestamp(), "true", raw_evidence)
else:
return simple_evidence(get_id('05.4'), get_timestamp(), "false", raw_evidence)
# Check last Syscheck & Rootcheck scan times
# When producing 'real' evidence, make sure to provide differentiation between Syscheck and Rootcheck outputs.
def check_last_scan_time(wc, agent_id):
body = wc.req('GET', 'syscheck/' + agent_id + '/last_scan')
measurement_result = body['data']['affected_items'][0]['end']
evidence1 = simple_evidence(get_id('05.4'), get_timestamp(), measurement_result, body)
body = wc.req('GET', 'rootcheck/' + agent_id + '/last_scan')
measurement_result = body['data']['affected_items'][0]['end']
evidence2 = simple_evidence(get_id('05.4'), get_timestamp(), measurement_result, body)
return evidence1, evidence2
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment