forked from MaterializeInc/materialize
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmzcompose.py
115 lines (102 loc) · 3.57 KB
/
mzcompose.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import requests
from materialize.mzcompose.composition import (
Composition,
Service,
WorkflowArgumentParser,
)
from materialize.mzcompose.services.debezium import Debezium
from materialize.mzcompose.services.kafka import Kafka
from materialize.mzcompose.services.materialized import Materialized
from materialize.mzcompose.services.metabase import Metabase
from materialize.mzcompose.services.mysql import MySql
from materialize.mzcompose.services.schema_registry import SchemaRegistry
from materialize.mzcompose.services.zookeeper import Zookeeper
SERVICES = [
Zookeeper(),
Kafka(auto_create_topics=True),
SchemaRegistry(),
Debezium(),
MySql(root_password="rootpw"),
Materialized(),
Metabase(),
Service(
name="chbench",
config={
"mzbuild": "chbenchmark",
"init": True,
"volumes": ["mydata:/gen"],
},
),
]
def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
"""Run CH-benCHmark without any load on Materialize"""
# Parse arguments.
parser.add_argument(
"--wait", action="store_true", help="wait for the load generator to exit"
)
args, unknown_args = parser.parse_known_args()
# Start Materialize.
c.up("materialized")
# Start MySQL and Debezium.
c.up("zookeeper", "kafka", "schema-registry", "mysql", "debezium")
# Generate initial data.
c.run(
"chbench",
"gen",
"--config-file-path=/etc/chbenchmark/mz-default-mysql.cfg",
"--warehouses=1",
)
# Start Debezium.
response = requests.post(
f"http://localhost:{c.default_port('debezium')}/connectors",
json={
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "rootpw",
"database.server.name": "debezium",
"database.server.id": "1234",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "mysql-history",
"database.allowPublicKeyRetrieval": "true",
"time.precision.mode": "connect",
"topic.prefix": "mysql",
},
},
)
# Don't error if the connector already exists.
if response.status_code != requests.codes.conflict:
response.raise_for_status()
# Run load generator.
c.run(
"chbench",
"run",
"--config-file-path=/etc/chbenchmark/mz-default-mysql.cfg",
"--dsn=mysql",
"--gen-dir=/var/lib/mysql-files",
"--analytic-threads=0",
"--transactional-threads=1",
"--run-seconds=86400",
"--mz-sources",
*unknown_args,
detach=not args.wait,
)
def workflow_load_test(c: Composition) -> None:
"""Run CH-benCHmark with a selected amount of load against Materialize."""
c.workflow(
"default",
"--peek-conns=1",
"--mz-views=q01,q02,q05,q06,q08,q09,q12,q14,q17,q19",
"--transactional-threads=2",
)