Skip to content

Commit

Permalink
Merge pull request #830 from MolSSI/rm_log
Browse files Browse the repository at this point in the history
Remove server stats & compute manager logs
  • Loading branch information
bennybp authored May 7, 2024
2 parents e16b51f + 0007c71 commit 56b9189
Show file tree
Hide file tree
Showing 23 changed files with 117 additions and 687 deletions.
1 change: 0 additions & 1 deletion docs/source/user_guide/connecting_qcportal.rst
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ the server administrator wishes to include.
'get_managers': 1000,
'manager_tasks_claim': 200,
'manager_tasks_return': 10,
'get_server_stats': 25,
'get_access_logs': 1000,
'get_error_logs': 100,
'get_internal_jobs': 1000},
Expand Down
2 changes: 0 additions & 2 deletions qcarchivetesting/qcarchivetesting/testing_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ def __init__(
"get_dataset_entries": 5,
"get_molecules": 11,
"get_managers": 10,
"get_server_stats": 10,
"get_error_logs": 10,
"get_access_logs": 10,
}
Expand All @@ -144,7 +143,6 @@ def __init__(
qcf_config["loglevel"] = "DEBUG"
qcf_config["heartbeat_frequency"] = 3
qcf_config["heartbeat_max_missed"] = 2
qcf_config["statistics_frequency"] = 3

qcf_config["database"] = {"pool_size": 0}
qcf_config["log_access"] = log_access
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Remove server stats log
Revision ID: 73b4838a6839
Revises: 75b80763e901
Create Date: 2024-05-06 10:54:44.383709
"""

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "73b4838a6839"
down_revision = "75b80763e901"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index("ix_server_stats_log_timestamp", table_name="server_stats_log", postgresql_using="brin")
op.drop_table("server_stats_log")

op.execute("DELETE FROM internal_jobs WHERE name = 'update_server_stats'")
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"server_stats_log",
sa.Column("id", sa.INTEGER(), autoincrement=True, nullable=False),
sa.Column("timestamp", postgresql.TIMESTAMP(timezone=True), autoincrement=False, nullable=False),
sa.Column("collection_count", sa.INTEGER(), autoincrement=False, nullable=True),
sa.Column("molecule_count", sa.BIGINT(), autoincrement=False, nullable=True),
sa.Column("record_count", sa.BIGINT(), autoincrement=False, nullable=True),
sa.Column("outputstore_count", sa.BIGINT(), autoincrement=False, nullable=True),
sa.Column("access_count", sa.BIGINT(), autoincrement=False, nullable=True),
sa.Column("db_total_size", sa.BIGINT(), autoincrement=False, nullable=True),
sa.Column("db_table_size", sa.BIGINT(), autoincrement=False, nullable=True),
sa.Column("db_index_size", sa.BIGINT(), autoincrement=False, nullable=True),
sa.Column("db_table_information", postgresql.JSON(astext_type=sa.Text()), autoincrement=False, nullable=True),
sa.Column("error_count", sa.BIGINT(), autoincrement=False, nullable=True),
sa.Column("service_queue_status", postgresql.JSON(astext_type=sa.Text()), autoincrement=False, nullable=True),
sa.Column("task_queue_status", postgresql.JSON(astext_type=sa.Text()), autoincrement=False, nullable=True),
sa.PrimaryKeyConstraint("id", name="server_stats_log_pkey"),
)
op.create_index(
"ix_server_stats_log_timestamp", "server_stats_log", ["timestamp"], unique=False, postgresql_using="brin"
)
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""Remove compute manager log
Revision ID: 75b80763e901
Revises: f31c7897345f
Create Date: 2024-05-06 10:08:30.711531
"""

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "75b80763e901"
down_revision = "f31c7897345f"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index("ix_compute_manager_log_manager_id", table_name="compute_manager_log")
op.drop_table("compute_manager_log")
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"compute_manager_log",
sa.Column("id", sa.INTEGER(), autoincrement=True, nullable=False),
sa.Column("manager_id", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("timestamp", postgresql.TIMESTAMP(timezone=True), autoincrement=False, nullable=False),
sa.Column("successes", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("claimed", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("failures", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("total_cpu_hours", sa.DOUBLE_PRECISION(precision=53), autoincrement=False, nullable=False),
sa.Column("active_tasks", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("active_cores", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("active_memory", sa.DOUBLE_PRECISION(precision=53), autoincrement=False, nullable=False),
sa.Column("rejected", sa.INTEGER(), autoincrement=False, nullable=False),
sa.ForeignKeyConstraint(
["manager_id"], ["compute_manager.id"], name="compute_manager_log_manager_id_fkey", ondelete="CASCADE"
),
sa.PrimaryKeyConstraint("id", name="compute_manager_log_pkey"),
)
op.create_index("ix_compute_manager_log_manager_id", "compute_manager_log", ["manager_id"], unique=False)
# ### end Alembic commands ###
1 change: 0 additions & 1 deletion qcfractal/qcfractal/components/auth/role_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
"Resource": [
"/api/v1/roles",
"/api/v1/managers",
"/api/v1/server_stats",
"/api/v1/server_errors",
"/api/v1/access_logs",
"/api/v1/tasks",
Expand Down
35 changes: 0 additions & 35 deletions qcfractal/qcfractal/components/managers/db_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,6 @@
from qcportal.utils import now_at_utc


class ComputeManagerLogORM(BaseORM):
"""
Table for storing manager logs
This contains information about a manager at a particular point in time. This table
is periodically appended to, with updated information about a manager.
"""

__tablename__ = "compute_manager_log"

id = Column(Integer, primary_key=True)
manager_id = Column(Integer, ForeignKey("compute_manager.id", ondelete="cascade"), nullable=False)

timestamp = Column(TIMESTAMP(timezone=True), default=now_at_utc, nullable=False)

claimed = Column(Integer, nullable=False)
successes = Column(Integer, nullable=False)
failures = Column(Integer, nullable=False)
rejected = Column(Integer, nullable=False)

active_tasks = Column(Integer, nullable=False, default=0)
active_cores = Column(Integer, nullable=False, default=0)
active_memory = Column(Float, nullable=False, default=0.0)
total_cpu_hours = Column(Float, nullable=False, default=0.0)

__table_args__ = (Index("ix_compute_manager_log_manager_id", "manager_id"),)


class ComputeManagerORM(BaseORM):
"""
Table for storing information about active and inactive compute managers
Expand Down Expand Up @@ -81,13 +53,6 @@ class ComputeManagerORM(BaseORM):
manager_version = Column(String, nullable=False)
programs = Column(JSON, nullable=False)

log = relationship(
ComputeManagerLogORM,
order_by=ComputeManagerLogORM.timestamp.desc(),
cascade="all, delete-orphan",
passive_deletes=True,
)

__table_args__ = (
Index("ix_compute_manager_status", "status"),
Index("ix_compute_manager_modified_on", "modified_on", postgresql_using="brin"),
Expand Down
45 changes: 3 additions & 42 deletions qcfractal/qcfractal/components/managers/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from qcportal.exceptions import MissingDataError, ComputeManagerError
from qcportal.managers import ManagerStatusEnum, ManagerName, ManagerQueryFilters
from qcportal.utils import now_at_utc
from .db_models import ComputeManagerLogORM, ComputeManagerORM
from .db_models import ComputeManagerORM

if TYPE_CHECKING:
from sqlalchemy.orm.session import Session
Expand Down Expand Up @@ -60,26 +60,6 @@ def add_internal_job_check_heartbeats(self, delay: float, *, session: Optional[S
session=session,
)

@staticmethod
def save_snapshot(orm: ComputeManagerORM):
"""
Saves the statistics of a manager to its log entries
"""

log_orm = ComputeManagerLogORM(
claimed=orm.claimed,
successes=orm.successes,
failures=orm.failures,
rejected=orm.rejected,
active_tasks=orm.active_tasks,
active_cores=orm.active_cores,
active_memory=orm.active_memory,
total_cpu_hours=orm.total_cpu_hours,
timestamp=orm.modified_on,
)

orm.log.append(log_orm)

def activate(
self,
name_data: ManagerName,
Expand Down Expand Up @@ -141,13 +121,12 @@ def update_resource_stats(
session: Optional[Session] = None,
):
"""
Updates the resources available/in use by a manager, and saves it to its log entries
Updates the resources available/in use by a manager
"""

with self.root_socket.optional_session(session) as session:
stmt = (
select(ComputeManagerORM)
.options(selectinload(ComputeManagerORM.log))
.where(ComputeManagerORM.name == name)
.with_for_update(skip_locked=False)
)
Expand All @@ -164,8 +143,6 @@ def update_resource_stats(
manager.total_cpu_hours = total_cpu_hours
manager.modified_on = now_at_utc()

self.save_snapshot(manager)

def deactivate(
self,
name: Optional[Iterable[str]] = None,
Expand Down Expand Up @@ -347,20 +324,4 @@ def check_manager_heartbeats(self, session: Session, job_progress: JobProgress)
dead_managers = self.deactivate(modified_before=dt, reason="missing heartbeat", session=session)

if dead_managers:
self._logger.info(f"Deactivated {len(dead_managers)} managers due to missing heartbeats")

####################################################
# Some stuff to be retrieved for managers
####################################################

def get_log(self, name: str, *, session: Optional[Session] = None) -> List[Dict[str, Any]]:
stmt = select(ComputeManagerORM)
stmt = stmt.options(defer("*"), lazyload("*"))
stmt = stmt.options(joinedload(ComputeManagerORM.log).options(undefer("*")))
stmt = stmt.where(ComputeManagerORM.name == name)

with self.root_socket.optional_session(session) as session:
rec = session.execute(stmt).unique().scalar_one_or_none()
if rec is None:
raise MissingDataError(f"Cannot find manager {name}")
return [x.model_dict() for x in rec.log]
self._logger.info(f"Deactivated {len(dead_managers)} managers due to missing heartbeats")
2 changes: 0 additions & 2 deletions qcfractal/qcfractal/components/managers/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def test_manager_client_get(snowflake: QCATestingSnowflake):
assert manager[1].modified_on > time_0
assert manager[1].created_on < time_1
assert manager[1].modified_on < time_1
assert manager[1].log is not None

assert manager[0].name == name2
assert manager[0].tags == ["tag1"]
Expand All @@ -61,7 +60,6 @@ def test_manager_client_get(snowflake: QCATestingSnowflake):
assert manager[0].modified_on > time_1
assert manager[0].created_on < time_2
assert manager[0].modified_on < time_2
assert manager[0].log is not None

assert manager[2].id == manager[1].id
assert manager[3].id == manager[0].id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def test_manager_client_query(queryable_managers_client: PortalClient):
query_res = queryable_managers_client.query_managers(manager_id=[managers[0].id, managers[1].id])
query_res_l = list(query_res)
assert len(query_res_l) == 2
assert all(x.log is not None for x in query_res_l)


def test_manager_client_query_empty_iter(queryable_managers_client: PortalClient):
Expand Down
19 changes: 0 additions & 19 deletions qcfractal/qcfractal/components/managers/test_manager_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,15 +235,13 @@ def test_manager_mclient_heartbeat(snowflake: QCATestingSnowflake):

client = snowflake.client()
manager = client.get_managers(name1)
assert len(manager.log) == 0

# Now do a heartbeat
mclient1.heartbeat(total_cpu_hours=5.678, active_tasks=3, active_cores=10, active_memory=3.45)

time_2 = now_at_utc()

manager = client.get_managers(name1)
assert len(manager.log) == 1

# Was the data stored in the manager
assert manager.total_cpu_hours == 5.678
Expand All @@ -253,14 +251,6 @@ def test_manager_mclient_heartbeat(snowflake: QCATestingSnowflake):
assert manager.modified_on > time_1
assert manager.modified_on < time_2

# and the log
log = manager.log[0]
assert log.total_cpu_hours == 5.678
assert log.active_tasks == 3
assert log.active_cores == 10
assert log.active_memory == 3.45
assert log.timestamp == manager.modified_on

# Now do another heartbeat
mclient1.heartbeat(
total_cpu_hours=2 * 5.678,
Expand All @@ -272,7 +262,6 @@ def test_manager_mclient_heartbeat(snowflake: QCATestingSnowflake):
time_3 = now_at_utc()

manager = client.get_managers(name1)
assert len(manager.log) == 2

# Was the data stored in the manager
assert manager.total_cpu_hours == 2 * 5.678
Expand All @@ -282,14 +271,6 @@ def test_manager_mclient_heartbeat(snowflake: QCATestingSnowflake):
assert manager.modified_on > time_2
assert manager.modified_on < time_3

# and the log
log = manager.log[0]
assert log.total_cpu_hours == 2 * 5.678
assert log.active_tasks == 2 * 3
assert log.active_cores == 2 * 10
assert log.active_memory == 2 * 3.45
assert log.timestamp == manager.modified_on


def test_manager_mclient_heartbeat_deactivated(snowflake: QCATestingSnowflake):
mname1 = ManagerName(cluster="test_cluster", hostname="a_host", uuid="1234-5678-1234-5678")
Expand Down
34 changes: 0 additions & 34 deletions qcfractal/qcfractal/components/serverinfo/db_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,40 +110,6 @@ def model_dict(self, exclude: Optional[Iterable[str]] = None) -> Dict[str, Any]:
return d


class ServerStatsLogORM(BaseORM):
"""
Table for storing server statistics
Server statistics (storage size, row count, etc) are periodically captured and
stored in this table
"""

__tablename__ = "server_stats_log"

id = Column(Integer, primary_key=True)
timestamp = Column(TIMESTAMP(timezone=True), nullable=False, default=now_at_utc)

# Raw counts
collection_count = Column(Integer)
molecule_count = Column(BigInteger)
record_count = Column(BigInteger)
outputstore_count = Column(BigInteger)
access_count = Column(BigInteger)
error_count = Column(BigInteger)

# Task & service queue status
task_queue_status = Column(JSON)
service_queue_status = Column(JSON)

# Database
db_total_size = Column(BigInteger)
db_table_size = Column(BigInteger)
db_index_size = Column(BigInteger)
db_table_information = Column(JSON)

__table_args__ = (Index("ix_server_stats_log_timestamp", "timestamp", postgresql_using="brin"),)


class MessageOfTheDayORM(BaseORM):
"""
Table for storing the Message-of-the-Day
Expand Down
Loading

0 comments on commit 56b9189

Please sign in to comment.