-
Notifications
You must be signed in to change notification settings - Fork 1
/
hdp_sensor_unit_tests.py
41 lines (33 loc) · 1.19 KB
/
hdp_sensor_unit_tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import unittest
from unittest import mock
from airflow import DAG, AirflowException
from airflow.utils import dates
from sensors.azure_hortonworks_livyspark import AzureLivySpark
from sensors.azure_hortonworks_webhcat_hive import AzureWebHCatHive
class HdpSensorTest(unittest.TestCase):
def setUp(self):
args = {
'owner': 'airflow',
'start_date': dates.days_ago(1)
}
self.dag = DAG('test_dag_id', default_args=args)
@mock.patch('hooks.hortonworks_ambari_hook.HdpAmbariHook', autospec=True)
def test_poke_hive(self, mock_hook):
sensor = AzureWebHCatHive(
job_id="123",
task_id='hive_sensor',
dag=self.dag,
check_options={'timeout': 5}
)
with self.assertRaises(AirflowException):
sensor.poke(None)
@mock.patch('hooks.hortonworks_ambari_hook.HdpAmbariHook', autospec=True)
def test_poke_spark(self, mock_hook):
sensor = AzureLivySpark(
job_id="123",
task_id='spark_sensor',
dag=self.dag,
check_options={'timeout': 5}
)
with self.assertRaises(AirflowException):
sensor.poke(None)