Refactor ICMPStatusCheck run method

The ICMPStatusCheck was performed by shelling out
to the `ping` executable, by building a string for the
command and supplying it to `subprocess.Popen`.

This is dangerous because it allows for shell injections,
as the content of the command is partly user controlled:
one could set the instance address to `8.8.8.8; rm -rf /`,
which would be happily executed.

This is avoided by passing a list of strings to subprocess,
so that the arguments are all passed to ping. The above example
then results in `ping: bad address '8.8.8.8 ; rm -rf /'`

The issue was detected by the Bandit linter:
`B602:subprocess_popen_with_shell_equals_true`

Additionnally, we can simplify the flow by using `check_output`
while redirecting `stderr` to `STDOUT`, and catching the
`CalledProcessError` when the command fails.

This also adds some unit tests for this check.

See also: https://security.openstack.org/guidelines/dg_use-subprocess-securely.html
This commit is contained in:
Jean-Fred Berthelot 2017-09-28 15:02:16 +01:00 committed by Jean-Frédéric
parent 2e52785e6b
commit 5040c55443
2 changed files with 71 additions and 10 deletions

View File

@ -608,18 +608,14 @@ class ICMPStatusCheck(StatusCheck):
instances = self.instance_set.all()
target = self.instance_set.get().address
# We need to read both STDOUT and STDERR because ping can write to both, depending on the kind of error.
# Thanks a lot, ping.
ping_process = subprocess.Popen("ping -c 1 " + target, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
shell=True)
response = ping_process.wait()
if response == 0:
args = ['ping', '-c', '1', target]
try:
# We redirect stderr to STDOUT because ping can write to both, depending on the kind of error.
subprocess.check_output(args, stderr=subprocess.STDOUT, shell=False)
result.succeeded = True
else:
output = ping_process.stdout.read()
except subprocess.CalledProcessError as e:
result.succeeded = False
result.error = output
result.error = e.output
return result

View File

@ -0,0 +1,65 @@
import subprocess
from mock import patch
from cabot.cabotapp.models import (
ICMPStatusCheck,
Instance,
Service,
)
from .tests_basic import LocalTestCase
class TestICMPCheckRun(LocalTestCase):
def setUp(self):
super(TestICMPCheckRun, self).setUp()
self.instance = Instance.objects.create(
name='Instance',
address='1.2.3.4'
)
self.icmp_check = ICMPStatusCheck.objects.create(
name='ICMP Check',
created_by=self.user,
importance=Service.CRITICAL_STATUS,
)
self.instance.status_checks.add(
self.icmp_check)
self.patch = patch('cabot.cabotapp.models.subprocess.check_output', autospec=True)
self.mock_check_output = self.patch.start()
def tearDown(self):
self.patch.stop()
super(TestICMPCheckRun, self).tearDown()
def test_icmp_run_use_instance_address(self):
self.icmp_check.run()
args = ['ping', '-c', '1', u'1.2.3.4']
self.mock_check_output.assert_called_once_with(args, shell=False, stderr=-2)
def test_icmp_run_success(self):
checkresults = self.icmp_check.statuscheckresult_set.all()
self.assertEqual(len(checkresults), 0)
self.icmp_check.run()
checkresults = self.icmp_check.statuscheckresult_set.all()
self.assertEqual(len(checkresults), 1)
self.assertTrue(self.icmp_check.last_result().succeeded)
def test_icmp_run_bad_address(self):
self.mock_check_output.side_effect = subprocess.CalledProcessError(2, None, "ping: bad address")
checkresults = self.icmp_check.statuscheckresult_set.all()
self.assertEqual(len(checkresults), 0)
self.icmp_check.run()
checkresults = self.icmp_check.statuscheckresult_set.all()
self.assertEqual(len(checkresults), 1)
self.assertFalse(self.icmp_check.last_result().succeeded)
def test_icmp_run_inacessible(self):
self.mock_check_output.side_effect = subprocess.CalledProcessError(1, None, "packet loss")
checkresults = self.icmp_check.statuscheckresult_set.all()
self.assertEqual(len(checkresults), 0)
self.icmp_check.run()
checkresults = self.icmp_check.statuscheckresult_set.all()
self.assertEqual(len(checkresults), 1)
self.assertFalse(self.icmp_check.last_result().succeeded)