Skip to content

Commit

Permalink
Fix resource manger alive keys always have a TTL (#1848)
Browse files Browse the repository at this point in the history
* alive key will always expire

* adjusted tests and added new test for alive key

* fixed wrong number of arguments

* making sure tests pass

Co-authored-by: Andrei Neagu <neagu@itis.swiss>
  • Loading branch information
GitHK and Andrei Neagu authored Sep 30, 2020
1 parent 0ef11c4 commit fe3ea41
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@

@attr.s(auto_attribs=True)
class RedisResourceRegistry:
""" Keeps a record of connected sockets per user
"""Keeps a record of connected sockets per user
redis structure is following
Redis Hash: key=user_id:client_session_id values={server_id socket_id project_id}
redis structure is following
Redis Hash: key=user_id:client_session_id values={server_id socket_id project_id}
"""

app: web.Application
Expand Down Expand Up @@ -92,12 +92,12 @@ async def find_keys(self, resource: Tuple[str, str]) -> List[Dict[str, str]]:
keys.append(self._decode_hash_key(hash_key))
return keys

async def set_key_alive(
self, key: Dict[str, str], alive: bool, timeout: int = 0
) -> None:
async def set_key_alive(self, key: Dict[str, str], timeout: int) -> None:
# setting the timeout to always expire, timeout > 0
timeout = int(max(1, timeout))
client = get_redis_client(self.app)
hash_key = f"{self._hash_key(key)}:{ALIVE_SUFFIX}"
await client.set(hash_key, 1, expire=0 if alive else timeout)
await client.set(hash_key, 1, expire=timeout)

async def is_key_alive(self, key: Dict[str, str]) -> bool:
client = get_redis_client(self.app)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ async def set_socket_id(self, socket_id: str) -> None:
)
registry = get_registry(self.app)
await registry.set_resource(self._resource_key(), (SOCKET_ID_KEY, socket_id))
await registry.set_key_alive(self._resource_key(), True)
# hearthbeat is not emulated in tests, make sure that with very small GC intervals
# the resources do not result as timeout; this value is usually in the order of minutes
timeout = max(3, get_service_deletion_timeout(self.app))
await registry.set_key_alive(self._resource_key(), timeout)

async def get_socket_id(self) -> Optional[str]:
log.debug(
Expand All @@ -70,7 +73,7 @@ async def user_pressed_disconnect(self) -> None:
"""When the user disconnects expire as soon as possible the alive key
to ensure garbage collection will trigger in the next 2 cycles."""
registry = get_registry(self.app)
await registry.set_key_alive(self._resource_key(), False, 1)
await registry.set_key_alive(self._resource_key(), 1)

async def remove_socket_id(self) -> None:
log.debug(
Expand All @@ -81,14 +84,14 @@ async def remove_socket_id(self) -> None:
registry = get_registry(self.app)
await registry.remove_resource(self._resource_key(), SOCKET_ID_KEY)
await registry.set_key_alive(
self._resource_key(), False, get_service_deletion_timeout(self.app)
self._resource_key(), get_service_deletion_timeout(self.app)
)

async def set_heartbeat(self) -> None:
"""Refreshes heartbeat """
registry = get_registry(self.app)
await registry.set_key_alive(
self._resource_key(), False, get_service_deletion_timeout(self.app)
self._resource_key(), get_service_deletion_timeout(self.app)
)

async def find_socket_ids(self) -> List[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,21 @@ async def test_redis_registry(loop, redis_registry):
assert all(x in [key, second_key] for x in found_keys)
assert not await redis_registry.find_keys(invalid_resource)

# create alive key
await redis_registry.set_key_alive(key, True)
DEAD_KEY_TIMEOUT = 1
STILL_ALIVE_KEY_TIMEOUT = DEAD_KEY_TIMEOUT + 1

# create a key which will be alive when testing
await redis_registry.set_key_alive(key, STILL_ALIVE_KEY_TIMEOUT)
assert await redis_registry.is_key_alive(key) == True
# create soon to be dead key
TIMEOUT = 3
await redis_registry.set_key_alive(second_key, False, TIMEOUT)
await redis_registry.set_key_alive(second_key, DEAD_KEY_TIMEOUT)
alive_keys, dead_keys = await redis_registry.get_all_resource_keys()
assert not dead_keys
assert all(x in alive_keys for x in [key, second_key])
assert all(x in [key, second_key] for x in alive_keys)
time.sleep(TIMEOUT)

time.sleep(DEAD_KEY_TIMEOUT)

assert await redis_registry.is_key_alive(second_key) == False
alive_keys, dead_keys = await redis_registry.get_all_resource_keys()
assert alive_keys == [key]
Expand All @@ -137,6 +141,29 @@ async def test_redis_registry(loop, redis_registry):
)


async def test_redis_registry_key_will_always_expire(loop, redis_registry):
get_random_int = lambda: randint(1, 10)
first_key = {f"key_{x}": f"value_{x}" for x in range(get_random_int())}
second_key = {f"sec_key_{x}": f"sec_value_{x}" for x in range(get_random_int())}

resources = [(f"res_key{x}", f"res_value{x}") for x in range(get_random_int())]
for resource in resources:
await redis_registry.set_resource(first_key, resource)
await redis_registry.set_resource(second_key, resource)

await redis_registry.set_key_alive(first_key, 0)
await redis_registry.set_key_alive(second_key, -3000)

time.sleep(1) # minimum amount of sleep

assert await redis_registry.is_key_alive(second_key) == False
assert await redis_registry.is_key_alive(first_key) == False

alive_keys, dead_keys = await redis_registry.get_all_resource_keys()
assert len(alive_keys) == 0
assert len(dead_keys) == 2


async def test_websocket_manager(loop, redis_enabled_app, redis_registry, user_ids):

# create some user ids and socket ids
Expand Down

0 comments on commit fe3ea41

Please sign in to comment.