Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ COPY realm_api /app/realm_api

RUN <<-EOF
mkdir -p /app/data
pip install --no-cache -Ue .
pip install --no-cache --no-warn-script-location -Ue .
EOF

ENTRYPOINT [ "/usr/local/bin/python", "-m", "realm_api" ]
61 changes: 0 additions & 61 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
"@semantic-release/git": "^10.0.1",
"@semantic-release/github": "^11.0.1",
"commitlint": "^17.7.1",
"commitlint-config-gitmoji": "^2.3.1",
"gitmoji-cli": "^8.5.0",
"husky": "^9.1.7",
"nano-staged": "^0.8.0",
Expand Down
10 changes: 6 additions & 4 deletions realm_api/rpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from uuid import uuid4

# 3rd party
from pydantic import BaseModel
from redis.asyncio import StrictRedis

# local
Expand All @@ -26,21 +27,22 @@ async def init_pubsub() -> aio.Task:


async def shutdown_pubsub(task: aio.Task):
pubsub.unsubscribe()
await pubsub.unsubscribe()
await pubsub.close()
task.cancel()


def handler(message: dict):
async def handler(message: dict):
"""Handle an incoming RPC operation and publish the result."""

data: dict = json.loads(message["data"])
logger.info(f"RPC op: {data['op']}")
result = handlers[data["op"]](
result: BaseModel = await handlers[data["op"]](
*data.get("args", []),
**data.get("kwargs", dict()),
)
redis_conn.publish(data["uuid"], json.dumps(result))
response = result.model_dump_json()
await redis_conn.publish(data["uuid"], response)


async def rpc_bot(op: str, *args, timeout=3, **kwargs):
Expand Down
7 changes: 0 additions & 7 deletions realm_api/rpc/roll.py

This file was deleted.

130 changes: 130 additions & 0 deletions realm_api/rpc/roll/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""Dice roller RPC module"""

# stdlib
from random import randint
from re import compile

# api
from realm_schema import (
BatchResults,
ConstantModifier,
DiceRoll,
RollResults,
RollSegment,
SegmentResult,
)

# local
from realm_api.logging import logger
from .parse import parse_segments


explode_regex = compile(r"!(?P<num>\d*)")
"""Regex for exploding die extra"""

keep_hilow_regex = compile(r"(?P<fun>k[hl])(?P<num>\d*)")
"""Regex for keep highest/lowest extras"""


def roll_segment(segment: RollSegment) -> SegmentResult:
"""Calculate the results of an individual RollSegment instance."""

if isinstance(segment, ConstantModifier):
return SegmentResult(
segment=segment,
total=segment.number * (-1 if segment.negative else 1),
)

if isinstance(segment, DiceRoll):
limit = 0
exploding = False
result = SegmentResult(segment=segment, rolls=[], work="")
assert result.rolls is not None
assert result.work is not None

if segment.extra and segment.extra[0] == "!":
exploding = True

# explosion limit
if len(segment.extra) > 1:
explode_limit_match = explode_regex.match(segment.extra)
assert explode_limit_match
limit = int(explode_limit_match.groupdict()["num"])

# roll dice
for _ in range(segment.dice):
roll = randint(1, segment.faces)
exploded = 0

# exploding die
while (
exploding
and roll == segment.faces
and (limit == 0 or exploded < limit)
):
exploded += 1
result.rolls.append(roll)
result.total += roll
roll = randint(1, segment.faces)

result.rolls.append(roll)
result.total += roll

# keep methods
if segment.extra and segment.extra.startswith("k"):
ord_rolls = sorted(result.rolls)
args_match = keep_hilow_regex.match(segment.extra)
assert args_match
args = args_match.groupdict()
fun = args["fun"]
keep = int(args["num"]) if args["num"] else 1

assert keep < segment.dice and keep > 0
drop = range(segment.dice - keep)

# keep highest
if fun == "kh":
for i in drop:
result.total -= ord_rolls[i]

# keep lowest
elif fun == "kl":
for i in drop:
result.total -= ord_rolls[-(i + 1)]

rolls = [
f"**{roll}**" if roll == segment.faces else f"{roll}"
for roll in result.rolls
]
result.work = "".join(
[
f"[{', '.join(rolls)}] = ",
f"**{'-' if segment.negative else ''}{result.total}**",
]
)

if segment.negative:
result.total *= -1

return result

raise NotImplementedError()


async def roll_handler(formula: str) -> BatchResults:
"""Roll them bones."""

results = BatchResults(results=[])

try:
batches = parse_segments(formula)

for segments in batches:
results.results.append(
RollResults(results=[roll_segment(s) for s in segments])
)
except Exception as ex:
logger.warning(ex)
return BatchResults(results=[])

return results
86 changes: 86 additions & 0 deletions realm_api/rpc/roll/parse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""Parse segments from roll command arguments"""

# stdlib
from re import compile

# local
from realm_schema import ConstantModifier, DiceRoll, RollSegment

roll_regex = compile(
r"^(?P<all>"
r"(?P<num>\d*)"
r"d(?P<die>\d+)"
r"(?P<fun>!\d*|k[hl]\d*)?"
r")"
r"(x(?P<batch>\d+))?"
)
"""Regex pattern for initial roll"""

addl_roll_regex = compile(
r"(?P<all>"
r"(?P<op>[-+])"
r"(?P<num>\d*)"
r"(d(?P<die>\d+)(?P<fun>!\d*|k[hl]\d*)?)?"
r")"
r"(x(?P<batch>\d+))?"
)
"""Regex pattern for additional modifier rolls/constants"""


def parse_segments(roll: str) -> list[list[RollSegment]]:
"""Parse a roll formula string into a list of RollSegment objects."""

segments = []

# strip all whitespace
roll = roll.replace(" ", "")

# parse first roll
first = roll_regex.match(roll)
assert first
first = first.groupdict()

# parse variable number of mods
mods = [r.groupdict() for r in addl_roll_regex.finditer(roll)]

batch = int(first["batch"] or 1)

for m in mods:
if m["batch"]:
batch = int(m["batch"])

batches = []

for _ in range(batch):
segments: list[RollSegment] = [
DiceRoll(
raw=first["all"],
dice=int(first["num"] or "1"),
faces=int(first["die"]),
extra=first["fun"],
)
]

for mod in mods:
if mod["die"]:
segments.append(
DiceRoll(
raw=mod["all"],
negative=mod["op"] == "-",
dice=int(mod["num"] or "1"),
faces=int(mod["die"]),
extra=mod["fun"],
)
)
else:
segments.append(
ConstantModifier(
raw=mod["all"],
negative=mod["op"] == "-",
number=int(mod["num"]),
)
)

batches.append(segments)

return batches