stoq-clamav/clamav/clamav.py

60 lines
2.0 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
# Copyright 2014-2018 PUNCH Cyber Analytics Group
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Overview
========
Scan attachments with ClamAV
"""
import clamd
from configparser import ConfigParser
from typing import List, Dict, Optional
from stoq.plugins import WorkerPlugin
from stoq import Payload, RequestMeta, WorkerResponse, ExtractedPayload, PayloadMeta
CLASS_ATTRIBUTES = [
'hashpayload', 'saveresults', 'daemon', 'interval', 'timeout', 'host', 'port', 'socket'
]
class SMTPPlugin(WorkerPlugin):
def __init__(self, config: ConfigParser, plugin_opts: Optional[Dict]) -> None:
super().__init__(config, plugin_opts)
self.hashpayload: bool = False
self.saveresults: bool = True
self.daemon: str = 'socket'
self.interval: int = 60
self.timeout: int = 10
self.host: str = '127.0.0.1'
self.port: int = 3310
self.socket: str = '/var/run/clamav/clamd.ctl'
for name in CLASS_ATTRIBUTES:
self._parse_option(plugin_opts, name)
def _parse_option(self, plugin_opts, name):
if plugin_opts and name in plugin_opts:
self[name] = plugin_opts[name]
elif config.has_option('options', name):
self[name] = config.getboolean('options', name)
def scan(self, payload: Payload, request_meta: RequestMeta) -> WorkerResponse:
message_json = '{"TEST":"TEST"}'
return WorkerResponse(message_json, errors=None, extracted=None)