Skip to content
Snippets Groups Projects
Commit eef54a04 authored by Zitnik, Anze's avatar Zitnik, Anze
Browse files

Merge branch 'develop' into 'master'

Added initial Docker image

See merge request medina/evidence-collector!1
parents c0b92a96 254dd879
No related branches found
No related tags found
No related merge requests found
__pycache__/
*.pyc
*$py.class
.idea/
dump.rdb
.git
.cache
.gitignore
.gitlab-ci.yml
\ No newline at end of file
__pycache__/
*.pyc
*$py.class
.idea/
dump.rdb
\ No newline at end of file
image: nexus-registry.xlab.si:5001/docker:dind
variables:
REGISTRY: registry-gitlab.xlab.si
before_script:
- export SERVICE=$(grep SERVICE MANIFEST | cut -d '=' -f2)
- export VERSION=$(grep VERSION MANIFEST | cut -d '=' -f2)
stages:
- build
- test
- push
build:
stage: build
script:
- docker build --no-cache -t $REGISTRY/medina/$SERVICE:$VERSION .
only:
- develop
- master
test:
stage: test
script:
- echo "not yet implemented"
only:
- develop
- master
push:
stage: push
script:
- docker login -u gitlab-ci-token -p $CI_JOB_TOKEN $REGISTRY
- docker tag $REGISTRY/medina/$SERVICE:$VERSION $REGISTRY/medina/$SERVICE:latest
- docker push $REGISTRY/medina/$SERVICE:$VERSION
- docker push $REGISTRY/medina/$SERVICE:latest
- docker logout $REGISTRY
only:
- master
\ No newline at end of file
# syntax=docker/dockerfile:1
FROM nexus-registry.xlab.si:5001/python:3.8-slim-buster
WORKDIR /evidence-collector
COPY requirements.txt requirements.txt
RUN pip3 install -r requirements.txt
COPY . .
RUN apt-get update && apt-get install -y redis-server jq
ENTRYPOINT ["./entrypoint.sh"]
\ No newline at end of file
VERSION=v0.0.1
SERVICE=evidence-collector
\ No newline at end of file
# Evidence Collector
This project includes modules for collecting evidence regarding Wazuh and VAT.
## Wazuh evidence collector
Wazuh evidence collector uses [Wazuh's API](https://documentation.wazuh.com/current/user-manual/api/reference.html) to access information about manager's and agents' system informations and configurations. As an additional measure to ensure correct configuration of [ClamAV](https://www.clamav.net/) (if installed on machine) we also make use of [Elasticsearch's API](https://www.elastic.co/guide/en/elasticsearch/reference/current/search.html) to dirrectly access collected logs - Elastic stack is one of the Wazuh's required components (usually installed on the same machine as Wazuh server, but can be stand alone as well).
## Installation & use
### Using docker
> Note: Docker image is not yet complete and might not work due to recent changes around scheduler etc.
1. Set up your Wazuh development environment. Use [Security Monitoring](https://gitlab.xlab.si/medina/security-monitoring) repository to create and deploy Vagrant box with all the required components.
2. Clone this repository.
3. Build Docker image:
```
docker build -t evidence-collector .
```
4. Run the image:
```
docker run evidence-collector
```
> Note: Current simple image runs code from `test.py`. If you wish to test anything else, change this file or edit `Dockerfile`.
### Local environment
1. Set up your Wazuh development environment. Use [Security Monitoring](https://gitlab.xlab.si/medina/security-monitoring) repository to create and deploy Vagrant box with all required components.
2. Clone this repository.
3. Install dependencies:
```
pip install -r requirements.txt
sudo apt-get install jq
```
4. a) Install Redis server locally:
```
sudo apt-get install redis-server
```
> Note: To stop Redis server use `/etc/init.d/redis-server stop`.
4. b) Run Redis server in Docker container:
```
docker run --name my-redis-server -p 6379:6379 -d redis
```
In this case also comment-out server start command in `entrypoint.sh`:
```
#redis-server &
```
5. Run `entrypoint.sh`:
```
./entrypoint.sh
```
> Note: This repository consists of multiple Python modules. When running Python code manually, use of `-m` flag might be necessary.
## Component configuration
### API User authentication
Current implementation has disabled SSL certificate verification & uses simple username/password verification (defined inside `/constants/constants.py`). Production version should change this with cert verification.
### Manual Elasticsearch API testin with cURL
Example command for testing the API via CLI:
```
curl --user admin:changeme --insecure -X GET "https://192.168.33.10:9200/wazuh-alerts*/_search?pretty" -H 'Content-Type: application/json' -d'
{"query": {
"bool": {
"must": [{"match": {"predecoder.program_name": "clamd"}},
{"match": {"rule.description": "Clamd restarted"}},
{"match": {"agent.id": "001"}}]
}
}
}'
```
### Running [RQ](https://github.com/rq/rq) and [RQ-scheduler](https://github.com/rq/rq-scheduler) localy
1. Install (if needed) and run `redis-server`:
```
sudo apt-get install redis-server
redis-server
```
> Note: By default, server listens on port `6379`. Take this into consideration when starting other components.
2. Install RQ and RQ-scheduler:
```
pip install rq
pip install rq-scheduler
```
3. Run both components in 2 terminals:
```
rqworker low
rqscheduler --host localhost --port 6379
```
> Note: `low` in the first command references task queue worker will use.
4. Run Python script containing RQ commands as usual:
```
python3 ...
```
## Known issues
### Python Elasticsearch library problems with ODFE
Latest versions (`7.14.0` & `7.15.0`) of Python Elasticsearch library have problems connecting to Open Distro for Elasticsearch and produce the following error when trying to do so:
```
elasticsearch.exceptions.UnsupportedProductError: The client noticed that the server is not a supported distribution of Elasticsearch
```
To resolve this, downgrade to older package version:
```
pip install 'elasticsearch<7.14.0'
```
{
"wazuh": {
"ip": "192.168.33.10",
"port": 55000,
"username": "wazuh-wui",
"password": "wazuh-wui"
},
"elastic": {
"ip": "192.168.33.10",
"port": 9200,
"username": "admin",
"password": "changeme"
},
"redis": {
"ip": "localhost",
"port": 6379,
"queue": "low"
}
}
\ No newline at end of file
#!/bin/bash
redis_ip=$(cat constants.json | jq -r '.redis.ip')
redis_port=$(cat constants.json | jq -r '.redis.port')
redis_queue=$(cat constants.json | jq -r '.redis.queue')
redis-server --port $redis_port &
rqworker $redis_queue &
rqscheduler --host $redis_ip --port $redis_port &
python3 -m scheduler.scheduler
tail -f /dev/null
\ No newline at end of file
import json
class Evidence:
def __init__(self, evidence_id, timestamp, resource_id, tool, resource_type, feature_type, feature_property, measurement_result, raw):
self.evidence_id = evidence_id
self.timestamp = timestamp
self.resource_id = resource_id
self.tool = tool
self.resource_type = resource_type
self.feature_type = feature_type
self.feature_property = feature_property
self.measurement_result = measurement_result
self.raw = raw
def toJson(self):
return json.dumps(self.__dict__)
def simple_evidence(evidence_id, timestamp, resource_id, measurement_result, raw):
return Evidence(evidence_id, timestamp, resource_id, None, None, None, None, measurement_result, raw)
\ No newline at end of file
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: proto/evidence.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from google.protobuf import struct_pb2 as google_dot_protobuf_dot_struct__pb2
from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2
DESCRIPTOR = _descriptor.FileDescriptor(
name='proto/evidence.proto',
package='',
syntax='proto3',
serialized_options=_b('Z\010evidence'),
serialized_pb=_b('\n\x14proto/evidence.proto\x1a\x1cgoogle/protobuf/struct.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"\xc1\x01\n\x08\x45vidence\x12\n\n\x02id\x18\x01 \x01(\t\x12\x12\n\nservice_id\x18\x02 \x01(\t\x12\x13\n\x0bresource_id\x18\x03 \x01(\t\x12-\n\ttimestamp\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x1a\n\x12\x61pplicable_metrics\x18\x05 \x03(\x05\x12\x0b\n\x03raw\x18\x06 \x01(\t\x12(\n\x08resource\x18\x07 \x01(\x0b\x32\x16.google.protobuf.ValueB\nZ\x08\x65videnceb\x06proto3')
,
dependencies=[google_dot_protobuf_dot_struct__pb2.DESCRIPTOR,google_dot_protobuf_dot_timestamp__pb2.DESCRIPTOR,])
_EVIDENCE = _descriptor.Descriptor(
name='Evidence',
full_name='Evidence',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='id', full_name='Evidence.id', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='service_id', full_name='Evidence.service_id', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='resource_id', full_name='Evidence.resource_id', index=2,
number=3, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='timestamp', full_name='Evidence.timestamp', index=3,
number=4, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='applicable_metrics', full_name='Evidence.applicable_metrics', index=4,
number=5, type=5, cpp_type=1, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='raw', full_name='Evidence.raw', index=5,
number=6, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='resource', full_name='Evidence.resource', index=6,
number=7, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=88,
serialized_end=281,
)
_EVIDENCE.fields_by_name['timestamp'].message_type = google_dot_protobuf_dot_timestamp__pb2._TIMESTAMP
_EVIDENCE.fields_by_name['resource'].message_type = google_dot_protobuf_dot_struct__pb2._VALUE
DESCRIPTOR.message_types_by_name['Evidence'] = _EVIDENCE
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
Evidence = _reflection.GeneratedProtocolMessageType('Evidence', (_message.Message,), dict(
DESCRIPTOR = _EVIDENCE,
__module__ = 'proto.evidence_pb2'
# @@protoc_insertion_point(class_scope:Evidence)
))
_sym_db.RegisterMessage(Evidence)
DESCRIPTOR._options = None
# @@protoc_insertion_point(module_scope)
from evidence import evidence_pb2, evidence
def create_grpc_message(ev):
ev_grpc = evidence_pb2.Evidence()
ev_grpc.id = ev.id
ev_grpc.timestamp = ev.timestamp
ev_grpc.resource_id = ev.resource_id
ev_grpc.service_id = ev.tool
ev_grpc.resource = ev.resource_type
ev_grpc.applicable_metrics = ev.measurement_result
ev_grpc.raw = ev.raw
syntax = "proto3";
import "google/protobuf/struct.proto";
import "google/protobuf/timestamp.proto";
option go_package = "evidence";
// TODO: Addapt to the final Evidence structure..
// Copied from https://github.com/clouditor/clouditor/blob/main/proto/evidence.proto
message Evidence {
string id = 1;
string service_id = 2;
string resource_id = 3;
// TODO: replace with google/type/date.proto timestamp.proto or date.proto?
google.protobuf.Timestamp timestamp = 4;
repeated int32 applicable_metrics = 5;
// "raw" evidence (for the auditor), for example the raw JSON response from
// the API. This does not follow a defined schema
string raw = 6;
// optional; a semantic representation of the Cloud resource according to our
// defined ontology. a JSON serialized node of our semantic graph. This may be
// Clouditor-specific.
google.protobuf.Value resource = 7;
}
\ No newline at end of file
elasticsearch==7.13.4
urllib3==1.25.8
redis==3.5.3
elasticsearch_dsl==7.4.0
rq==1.10.0
rq_scheduler==0.11.0
import json
from redis import Redis
from rq import Queue
from rq_scheduler import Scheduler
from wazuh_evidence_collector import wazuh_evidence_collector
f = open('constants.json',)
constants = json.load(f)
f.close()
def remove_jobs(scheduler):
jobs = scheduler.get_jobs()
for job in jobs:
scheduler.cancel(job)
def print_jobs(scheduler):
jobs = scheduler.get_jobs()
for job in jobs:
print(job)
redis = Redis(constants['redis']['ip'], constants['redis']['port'])
q = Queue(constants['redis']['queue'], connection=redis)
scheduler = Scheduler(connection=redis)
# TODO: Remove if needed
remove_jobs(scheduler)
# TODO: Change cron expression and repeat value for production verion.
# Should probably be "0 0 * * * ".
scheduler.cron(
'* * * * * ',
func=wazuh_evidence_collector.run_full_check,
args=[],
repeat=10,
queue_name=constants['redis']['queue'],
use_local_timezone=False
)
# TODO: Remove if needed
print_jobs(scheduler)
import json
import urllib3
class WazuhClient:
def __init__(self, ip, port, username, password):
self._ip = ip
self._port = port
self._username = username
self._password = password
self._auth_token = None
def req(self, method, resource, data=None, headers={}, auth_retry=True):
# TODO: add cert verification
c = urllib3.HTTPSConnectionPool(self._ip, port=self._port, cert_reqs='CERT_NONE', assert_hostname=False)
url = "https://%s:%i/%s" % (self._ip, self._port, resource)
headers['Content-Type'] = 'application/json'
if self._auth_token:
headers['Authorization'] = 'Bearer %s' % self._auth_token
resp = c.request(method, url, headers=headers, body=data)
if resp.status == 401:
if not auth_retry:
raise Exception("Authentication Error")
self._auth_token = None
self._login()
return self.req(method, resource, data, headers, auth_retry=False)
return json.loads(resp.data)
def _login(self):
login_endpoint = 'security/user/authenticate'
basic_auth = "%s:%s" % (self._username, self._password)
resp = self.req('GET', login_endpoint, headers=urllib3.make_headers(basic_auth=basic_auth), auth_retry=False)
self._auth_token = resp['data']['token']
import json
from wazuh_evidence_collector.wazuh_client import WazuhClient
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
from evidence.evidence import Evidence, simple_evidence
from random import randint
from sys import maxsize
from datetime import datetime
import pprint
f = open('constants.json',)
constants = json.load(f)
f.close()
wc = WazuhClient(constants['wazuh']['ip'], constants['wazuh']['port'], constants['wazuh']['username'], constants['wazuh']['password'])
es = Elasticsearch(
constants['elastic']['ip'],
http_auth=(constants['elastic']['username'], constants['elastic']['password']),
scheme='https',
port=constants['elastic']['port'],
use_ssl=False,
verify_certs=False,
ssl_show_warn=False,
)
# TODO: Get real data.
# Get (temporary) ID
def get_id(reqId):
return reqId + '-' + str(randint(0, maxsize))
# Get timestamp (can be changed according to our preferences)
def get_timestamp():
ts = datetime.utcnow()
return ts.strftime('%Y-%m-%dT%H:%M:%SZ')
# Wrapepr function that runs all the checks (for every manager/agent)
def run_full_check():
# Get list of all agent ids (including manager's)
def get_agents(wc):
body = wc.req('GET', 'agents')
agents_ids = []
for agent in body['data']['affected_items']:
agents_ids.append(agent['id'])
return body, agents_ids
body, agents_ids = get_agents(wc)
agent_evidences = []
for agent in agents_ids:
agent_evidences.append(wazuh_monitoring_enabled(wc, agent))
agent_evidences.append(malvare_protection_enabled(wc, es, agent))
# TODO: : Remove for production. This is only output for easier local testing.
for evidence in agent_evidences:
pprint.pprint(evidence.__dict__)
return agent_evidences
# Check Wazuh's configuration
def wazuh_monitoring_enabled(wc, agent_id):
# Check if syscheck enabled
def check_syscheck(wc, agent_id):
body = wc.req('GET', 'agents/' + agent_id + '/config/syscheck/syscheck')
measurement_result = body['data']['syscheck']['disabled'] == 'no'
return body, measurement_result
# Check if rootcheck enabled
def check_rootcheck(wc, agent_id):
body = wc.req('GET', 'agents/' + agent_id + '/config/syscheck/rootcheck')
measurement_result = body['data']['rootcheck']['disabled'] == 'no'
return body, measurement_result
# Check if there's at least one valid alerting service
def check_alert_integrations(wc):
body = wc.req('GET', 'manager/configuration')
# Check email notifications integration
try:
email_notifications = (True if body['data']['affected_items'][0]['global']['email_notification'] == 'yes' else False)
except:
email_notifications = False
# Check Slack and PagerDuty notifications integration
try:
integrations = body['data']['affected_items'][0]['integration']
slack_notifications = pagerduty_notifications = False
for integration in integrations:
if integration['name'] == 'slack':
slack_notifications = True
if integration['name'] == 'pagerduty':
pagerduty_notifications = True
except:
slack_notifications = pagerduty_notifications = False
measurement_result = email_notifications or slack_notifications or pagerduty_notifications
return body, measurement_result
raw_evidence = []
evidence, result_syscheck = check_syscheck(wc, agent_id)
raw_evidence.append(evidence)
evidence, result_rootcheck = check_rootcheck(wc, agent_id)
raw_evidence.append(evidence)
evidence, result_aler_integration = check_alert_integrations(wc)
raw_evidence.append(evidence)
if result_syscheck and result_rootcheck and result_aler_integration:
return simple_evidence(get_id('05.3'), get_timestamp(), agent_id, "true", raw_evidence)
else:
return simple_evidence(get_id('05.3'), get_timestamp(), agent_id, "false", raw_evidence)
# Check if agent uses ClamAV or VirusTotal
def malvare_protection_enabled(wc, es, agent_id):
# Check for VirusTotal integration
def check_virus_total_integration(wc):
body = wc.req('GET', 'manager/configuration')
# Check VirusTotal integration
try:
integrations = body['data']['affected_items'][0]['integration']
measurement_result = False
for integration in integrations:
if integration['name'] == 'virustotal':
measurement_result = True
break
except:
measurement_result = False
return body, measurement_result
# Check if ClamAV daemon process running
def check_clamd_process(wc, agent_id):
body = wc.req('GET', 'syscollector/' + agent_id + '/processes')
measurement_result = False
for package in body['data']['affected_items']:
if package['name'] == 'clamd':
measurement_result = True
break
return body, measurement_result
# Check ClamAV logs in Elasticsearch
def check_clamd_logs_elastic(es, agent_id):
s = Search(using=es, index="wazuh-alerts-*") \
.query("match", predecoder__program_name="clamd") \
.query("match", rule__description="Clamd restarted") \
.query("match", agent__id=agent_id)
body = s.execute().to_dict()
measurement_result = len(body['hits']['hits']) > 0
return body, measurement_result
raw_evidence = []
evidence, result_virus_total = check_virus_total_integration(wc)
raw_evidence.append(evidence)
evidence, result_lamd_process = check_clamd_process(wc, agent_id)
raw_evidence.append(evidence)
evidence, result_clamd_logs = check_clamd_logs_elastic(es, agent_id)
raw_evidence.append(evidence)
if result_virus_total or (result_lamd_process and result_clamd_logs):
return simple_evidence(get_id('05.4'), get_timestamp(), agent_id, "true", raw_evidence)
else:
return simple_evidence(get_id('05.4'), get_timestamp(), agent_id, "false", raw_evidence)
# Check last Syscheck & Rootcheck scan times
# TODO: When producing 'real' evidence, make sure to provide differentiation between Syscheck and Rootcheck outputs.
def check_last_scan_time(wc, agent_id):
body = wc.req('GET', 'syscheck/' + agent_id + '/last_scan')
measurement_result = body['data']['affected_items'][0]['end']
evidence1 = simple_evidence(get_id('05.4'), get_timestamp(), measurement_result, body)
body = wc.req('GET', 'rootcheck/' + agent_id + '/last_scan')
measurement_result = body['data']['affected_items'][0]['end']
evidence2 = simple_evidence(get_id('05.4'), get_timestamp(), measurement_result, body)
return evidence1, evidence2
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment