This is a hands-on lab to start implementing a connector module in STIX-shifter from scratch. The main purpose of this lab is to get an experience of developing a functional connector. You will basically recreate an already existing connector. We will mostly copy and paste required code blocks and functions. We chose the MySQL connector since its coding complexity is simpler than most of the existing connectors.
- GitHub account
- Basic knowledge of git such as forking, committing, branching, pulling, and merging.
- Working knowledge of the Python programming language. This lab will use Python 3.9
- An IDE to write Python code, such as VS Code.
- Knowledge of the data source API that includes API request, response, datatype and schema.
- Knowledge of STIX 2.0. To learn about STIX Cyber Observable Objects, see the STIX 2.0 specification.
Create Python3.9 virtual environment:
virtualenv -p python3.9 virtualenv && source virtualenv/bin/activate
Upgrade pip(optional):
python3 -m pip install --upgrade pip
Install Dependencies:
INSTALL_REQUIREMENTS_ONLY=1 python3 setup.py install
- You should now have a connector module skeleton for the new connector named
lab_connector
- Implement the
EntryPoint()
class instix_shifter_modules/lab_connector/entry_point.py
. - The EntryPoint class acts as a gateway to the various methods used by the translation and transmission classes.
from stix_shifter_utils.utils.base_entry_point import BaseEntryPoint
class EntryPoint(BaseEntryPoint):
def __init__(self, connection={}, configuration={}, options={}):
super().__init__(connection, configuration, options)
self.set_async(False)
if connection:
self.setup_transmission_basic(connection, configuration)
self.setup_translation_simple(dialect_default='default')
8. Implement input configuration of the connector in stix_shifter_modules/lab_connector/configuration
- A json file needs to be created that contains configuration parameters for each module. The configuration json file is required in order to validate the module specific parameters for a successful translation and transmission call. Please follow this naming convention when you create the file: config.json.
- Two top level json objects needs to be preset in the file:
connection
andconfiguration
. - The child attributes of the connection object should be the parameters required for making API calls which can be used by multiple users and role levels.
- Here's an example of the connection object:
"connection": {
"type": {
"displayName": "Lab Connector"
},
"host": {
"type": "text",
"regex": "^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]*[a-zA-Z0-9])\\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\\-]*[A-Za-z0-9])$"
},
"port": {
"type": "number",
"default": 3306,
"min": 1,
"max": 65535
},
"database": {
"type": "text"
},
"help": {
"type": "link",
"default": "data-sources.html"
},
"options": {
"table": {
"type": "text",
"optional": false
}
}
}
- The configuration object should contain the parameters that are required for API authentication for individual users and roles.
- Here's an example of the configuration object:
"configuration": {
"auth": {
"type" : "fields",
"username": {
"type": "password"
},
"password": {
"type": "password"
}
}
}
-
For this lab, copy the entire content from https://raw.githubusercontent.com/opencybersecurityalliance/stix-shifter/develop/stix_shifter_modules/mysql/configuration/config.json.
-
A second JSON file is required to translate the parameters defined in
config.json
for the UI. This file is necessary in order to help the UI framework show the parameters in human-readable format. For english language, create a file namedlang_en.json
.
Here's an example of the content of a lang_en.json
file:
"configuration": {
"auth": {
"username": {
"label": "Username",
"description": "Username with access to the database"
},
"password": {
"label": "Password",
"description": "Password of the user with access to the database"
}
}
}
- For this lab, copy the entire content from https://raw.githubusercontent.com/opencybersecurityalliance/stix-shifter/develop/stix_shifter_modules/mysql/configuration/lang_en.json.
Note For more details about the configuration JSON, go to Configuration JSON
-
Go to
stix_shifter_modules/lab_connector/stix_translation
-
Edit the from_stix_map(
from_stix_map.json
) JSON filesfrom_stix_map.json
file contains STIX Objects to datasource fields mapping.- The mapping of STIX objects and properties to data source fields determine how a STIX pattern is translated to a data source query.
- Update
stix_shifter_modules/lab_connector/stix_translation/json/from_stix_map.json
file with the content of https://raw.githubusercontent.com/opencybersecurityalliance/stix-shifter/develop/stix_shifter_modules/mysql/stix_translation/json/from_stix_map.json
Note: If the data source API offers more than one schema type then the dialect prefix can be added. For example:
dialect1_from_stix_map.json
-
Edit the
operators.json
file:- The operators.json file maps the STIX pattern operators to the data source query operators. Change the comparator values to match the operators supported in your data source.
- Update
stix_shifter_modules/lab_connector/stix_translation/json/operators.json
with the content of https://raw.githubusercontent.com/opencybersecurityalliance/stix-shifter/develop/stix_shifter_modules/mysql/stix_translation/json/operators.json
-
The
QueryTranslator()
class can be left as isstix_shifter_modules/mysql/stix_translation/query_translator.py
-
Edit the query constructor file in
stix_shifter_modules/lab_connector/stix_translation/query_constructor.py
:- The
query_constructor.py
file is where the native query is built from the ANTLR parsing. - When a STIX pattern is translated by STIX-shifter, it is first parsed with ANTLR 4 into nested expression objects. The native data source query is constructed from these nested objects.
- The parsing is recursively run through
QueryStringPatternTranslator._parse_expression
, which is found inquery_constructor.py
. - The main query constructor class and functions are already defined and implemented for this lab.
- The
Test the query translation command using the CLI tool
python main.py translate lab_connector query {} "[ipv4-addr:value = '127.0.0.1'] START t'2022-07-01T00:00:00.000Z' STOP t'2022-07-27T00:05:00.000Z'" '{"table":"demo_table"}'
- You need to implement five functionalities of the transmission module which are
ping
,query
,status
,results
anddelete
. - First create a class called
APIClient()
instix_shifter_modules/lab_connector/stix_transmission/api_client.py
. This is where you initialize the connection and configurations needed for the data source API requests. This class also includes the required data source API calls and utility functions.
Add the following code to the top of the API client:
import aiomysql
from pymysql.err import DatabaseError
class APIClient():
def __init__(self, connection, configuration):
auth = configuration.get('auth')
self.user = auth.get('username')
self.password = auth.get('password')
self.timeout = connection['options'].get('timeout')
self.result_limit = connection['options'].get('result_limit')
self.host = connection.get("host")
self.database = connection.get("database")
self.table = connection['options'].get("table")
self.port = connection.get("port")
self.auth_plugin = 'mysql_native_password'
- Create a file called
connector.py
if it doesn't yet exist, and add the following code to the top of the file:
import datetime
import json
from stix_shifter_utils.modules.base.stix_transmission.base_json_sync_connector import BaseJsonSyncConnector
from .api_client import APIClient
from stix_shifter_utils.utils.error_response import ErrorResponder
from stix_shifter_utils.utils import logger
class Connector(BaseJsonSyncConnector):
def __init__(self, connection, configuration):
self.api_client = APIClient(connection, configuration)
self.logger = logger.set_logger(__name__)
self.connector = __name__.split('.')[1]
- Now we can add the required transmission functions
- Define and implement a function named
ping_connection(self)
insidestix_shifter_modules/lab_connector/stix_transmission/connector.py
async def ping_connection(self):
response = await self.api_client.ping_data_source()
response_code = response.get('code')
response_txt = response.get('message')
return_obj = dict()
return_obj['success'] = False
if len(response) > 0 and response_code == 200:
return_obj['success'] = True
else:
ErrorResponder.fill_error(return_obj, response, ['message'], error=response_txt, connector=self.connector)
return return_obj
- Define and implement the
ping_data_source()
function insideAPIClient()
:
async def ping_data_source(self):
# Pings the data source
response = {"code": 200, "message": "All Good!"}
try:
pool = await aiomysql.create_pool(host=self.host, port=self.port,
user=self.user, password=self.password,
db=self.database, connect_timeout=self.timeout)
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 42;")
(r,) = await cur.fetchone()
assert r == 42
pool.close()
await pool.wait_closed()
except DatabaseError as err:
response["code"] = int(err.args[0])
response["message"] = err
except Exception as err:
response["code"] = 'unknown'
response["message"] = err
return response
Test the Ping command using the CLI tool
python main.py transmit lab_connector '{"host": "localhost", "database":"demo_db", "options": {"table":"demo_table"}}' '{"auth": {"username":"root", "password":""}}' ping
- As a synchronous connector, it doesn't require any API request to start or create the query. Therefore no need to define and implement the functions of the query function. The
self.setup_transmission_basic(connection, configuration)
statement inside entry point classEntryPoint()
takes care of that automatically.
Test the Query command using the CLI tool
python main.py transmit lab_connector '{"host": "localhost", "database":"demo_db", "options": {"table":"demo_table"}}' '{"auth": {"username":"root", "password":""}}' query "SELECT * FROM demo_table WHERE source_ipaddr = '10.0.0.9'"
- Same as query, a synchronous connector doesn't return any status from the data source so no action is needed.
Test the Status command with the CLI tool
python main.py transmit lab_connector '{"host": "localhost", "database":"demo_db", "options": {"table":"demo_table"}}' '{"auth": {"username":"root", "password":""}}' status "SELECT * FROM demo_table WHERE source_ipaddr = '10.0.0.9'"
- Define and implement a function named
create_results_connection(self, query, offset, length)
insidestix_shifter_modules/lab_connector/stix_transmission/connector.py
async def create_results_connection(self, query, offset, length):
return_obj = dict()
response = await self.api_client.run_search(query, start=offset, rows=length)
response_code = response.get('code')
response_txt = response.get('message')
if response_code == 200:
return_obj['success'] = True
return_obj['data'] = response.get('result')
else:
ErrorResponder.fill_error(return_obj, response, ['message'], error=response_txt, connector=self.connector)
return return_obj
-
Define and implement a function named
run_search(self, query, offset, length)
in theAPIClient()
class. -
Copy the code block for the MySQL connector's
run_search
function
Test the Results command using the CLI tool
python main.py transmit lab_connector '{"host": "localhost", "database":"demo_db", "options": {"table":"demo_table"}}' '{"auth": {"username":"root", "password":""}}' results "SELECT * FROM demo_table WHERE source_ipaddr = '10.0.0.9'" 0 100
- Deleting a query is not supported by the datasource API so no action is needed.
Test the Delete command with the CLI tool
python main.py transmit lab_connector '{"host": "localhost", "database":"demo_db", "options": {"table":"demo_table"}}' '{"auth": {"username":"root", "password":""}}' delete "SELECT * FROM demo_table WHERE source_ipaddr = '10.0.0.9'"
- Make sure the data source returns the results in JSON format
- Go to
stix_shifter_modules/lab_connector/stix_translation
- Create a JSON file named
to_stix_map.json
that maps data source fields to STIX objects. - Details on to_stix_map file can be found in Edit the to_stix_map JSON file
- For this lab, update
stix_shifter_modules/lab_connector/stix_translation/json/to_stix_map.json
file with the content of theto_stix_map.json
file - Implement the
ResultsTranslator(JSONToStix)
class inresults_translator.py
from stix_shifter_utils.stix_translation.src.json_to_stix.json_to_stix import JSONToStix
class ResultsTranslator(JSONToStix):
pass
Note
- If the datasource results need some pre-processing before applying the mapping then this ResultsTranslator() class can be used. Otherwise, the
self.setup_translation_simple(dialect_default='default')
statement inside entry point classEntryPoint()
takes care of the results tranlsation automatically. The parent utility classJSONToStix
automatically translates the results into STIX. - If the data source API offers more than one schema type then the dialect prefix can be added. For example:
dialect1_to_stix_map.json
Test the results translation command using the CLI tool
python main.py translate lab_connector results '{ "type":"identity","id":"identity--20a77a37-911e-468f-a165-28da7d02985b", "name":"MySQL Database", "identity_class":"system", "created": "2022-04-07T20:35:41.042Z", "modified": "2022-04-07T20:35:41.042Z" }' '[ { "source_ipaddr": "10.0.0.9", "dest_ipaddr": "10.0.0.9", "url": "www.example.org", "filename": "spreadsheet.doc", "sha256hash": "b0795d1f264efa26bf464612a95bba710c10d3de594d888b6282c48f15690459", "md5hash": "0a556fbb7d3c184fad0a625afccd2b62", "file_path": "C:/PHOTOS", "username": "root", "source_port": 143, "dest_port": 8080, "protocol": "udp", "entry_time": 1617123877.0, "system_name": "demo_system", "severity": 2, "magnitude": 1 } ]' '{"table":"demo_table"}'
12. Implement the ErrorMapper()
class in stix_shifter_modules/lab_connector/stix_transmission/error_mapper.py
- This is where you map any API specific error code messages for the return object. You can use the same error mapper content that is used in the MySQL connector.
13. Add any data source specific dependency to the stix_shifter_modules/lab_connector/requirements.txt
.
- In this case add
aiomysql==0.1.1
python main.py execute lab_connector lab_connector '{"type": "identity","id": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff","name": "mysql","identity_class": "system"}' '{"host": "localhost", "database":"demo_db", "options": {"table":"demo_table"}}' '{"auth": {"username":"root", "password":""}}' "[ipv4-addr:value = '10.0.0.9']"