Skip to content

Commit

Permalink
[#144] Integrate ElasticSearch
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal-Kolomanski committed Apr 26, 2022
1 parent 79c6e9d commit a77d21d
Show file tree
Hide file tree
Showing 29 changed files with 601 additions and 637 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased] - YYYY-MM-DD
### Added
- Integrate elastic_search results from the Marketplace [@Michal-Kolomanski]
- JMS Subscriber for user actions and flask cli for it [@michal-szostak]
- search_data migration [@Michal-Kolomanski]
- tqdm and logging in embedding pipeline [@Michal-Kolomanski]
Expand Down
2 changes: 1 addition & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ markupsafe = "==2.0.1"
flask-restx = "==0.3.0"
jsonref = "*"
json-ref-dict = "*"
pymongo = "*"
mongoengine = "*"
flask-mongoengine = "*"
inflection = "*"
Expand All @@ -38,6 +37,7 @@ sentry-sdk = {extras = ["flask"], version = "*"}
ray = {version = "==1.7.0", extras = ["default"]}
celery = {version = "==5.1.2", extras = ["redis"]}
"stomp.py" = "==7.0.0"
pymongo = "==3.12"
pydantic = "*"

[dev-packages]
Expand Down
335 changes: 167 additions & 168 deletions Pipfile.lock

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions recommender/api/schemas/recommendation.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@
description="Version of the recommendation engine",
example="RL",
),
"elastic_services": fields.List(
fields.Integer(
title="Service ID", description="The unique identifier of the service"
),
required=True,
title="ElasticSearch services list",
description="List of services IDs from ElasticSearch",
example=[1, 2, 3, 4],
),
"search_data": fields.Nested(search_data, required=True),
},
)
Expand Down
41 changes: 20 additions & 21 deletions recommender/engines/base/base_inference_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from abc import ABC, abstractmethod
import random
from typing import Dict, Any, List
from typing import Dict, Any, List, Tuple

from recommender.engines.panel_id_to_services_number_mapping import PANEL_ID_TO_K
from recommender.errors import (
Expand Down Expand Up @@ -46,11 +46,13 @@ def __call__(self, context: Dict[str, Any]) -> List[int]:
"""

user = self._get_user(context)
search_data = self._get_search_data(context)
elastic_services, search_data = self._get_elastic_services_and_search_data(
context
)

if user is not None:
return self._for_logged_user(user, search_data)
return self._for_anonymous_user(search_data)
return self._for_logged_user(user, elastic_services, search_data)
return self._for_anonymous_user(elastic_services)

@abstractmethod
def _load_models(self) -> None:
Expand All @@ -76,46 +78,43 @@ def _get_user(self, context: Dict[str, Any]) -> User:

return user

def _get_search_data(self, context: Dict[str, Any]) -> SearchData:
search_data = context.get("search_data")
search_data.pop(
"rating", None
) # We don't and we shouldn't take rating into consideration

# To prevent q being None (for SearchPhraseEncoder it must be a string)
search_data["q"] = search_data.get("q", "")

search_data = SearchData(**search_data)

return search_data
@staticmethod
def _get_elastic_services_and_search_data(
context: Dict[str, Any]
) -> [Tuple[int], SearchData]:
return tuple(context.get("elastic_services")), context.get("search_data")

@abstractmethod
def _for_logged_user(self, user: User, search_data: SearchData) -> List[int]:
def _for_logged_user(
self, user: User, elastic_services: Tuple[int], search_data: SearchData
) -> List[int]:
"""
Generate recommendation for logged user
Args:
user: user for whom recommendation will be generated.
elastic_services: item space from the Marketplace.
search_data: search phrase and filters information for narrowing
down an item space.
Returns:
recommended_services_ids: List of recommended services ids.
"""

def _for_anonymous_user(self, search_data: SearchData) -> List[int]:
def _for_anonymous_user(self, elastic_services: Tuple[int]) -> List[int]:
"""
Generate recommendation for anonymous user
Args:
search_data: search phrase and filters information for narrowing
down an item space.
elastic_services: item space from the Marketplace.
Returns:
recommended_services_ids: List of recommended services ids.
"""

candidate_services = list(retrieve_services_for_recommendation(search_data))
candidate_services = list(
retrieve_services_for_recommendation(elastic_services)
)
if len(candidate_services) < self.K:
raise InsufficientRecommendationSpaceError()
recommended_services = random.sample(list(candidate_services), self.K)
Expand Down
11 changes: 8 additions & 3 deletions recommender/engines/ncf/inference/ncf_inference_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

"""Neural Collaborative Filtering Inference Component"""

from typing import List
from typing import List, Tuple

import torch

Expand Down Expand Up @@ -73,11 +73,14 @@ def user_and_services_to_tensors(user, services):

return users_ids, users_tensor, services_ids, services_tensor

def _for_logged_user(self, user: User, search_data: SearchData) -> List[int]:
def _for_logged_user(
self, user: User, elastic_services: Tuple[int], search_data: SearchData
) -> List[int]:
"""Generate recommendation for logged user.
Args:
user: user for whom recommendation will be generated.
elastic_services: item space from the Marketplace.
search_data: search phrase and filters information for narrowing
down an item space.
Expand All @@ -86,7 +89,9 @@ def _for_logged_user(self, user: User, search_data: SearchData) -> List[int]:
"""

candidate_services = list(
retrieve_services_for_recommendation(search_data, user.accessed_services)
retrieve_services_for_recommendation(
elastic_services, user.accessed_services
)
)
if len(candidate_services) < self.K:
raise InsufficientRecommendationSpaceError()
Expand Down
13 changes: 8 additions & 5 deletions recommender/engines/rl/inference/rl_inference_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,18 @@ def _load_models(self) -> None:
)
self.service_selector = ServiceSelector(self.service_embedder)

def _for_logged_user(self, user: User, search_data: SearchData) -> Tuple[int]:
state = create_state(user, search_data)
def _for_logged_user(
self, user: User, elastic_services: Tuple[int], search_data: SearchData
) -> Tuple[int]:
state = create_state(user, elastic_services, search_data)
state_tensors = self.state_encoder([state])
weights_tensor = self._get_weights(state_tensors)
search_data_mask = self._get_search_data_mask(state_tensors)
service_ids = self.service_selector(weights_tensor, search_data_mask)
services_mask = self._get_service_mask(state_tensors)
service_ids = self.service_selector(weights_tensor, services_mask)
return service_ids

def _get_search_data_mask(self, state_tensors):
@staticmethod
def _get_service_mask(state_tensors):
return state_tensors[2][0]

def _get_weights(self, state_tensors):
Expand Down
2 changes: 1 addition & 1 deletion recommender/engines/rl/ml_components/sars_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ def __call__(
states = [SARS.state for SARS in SARSes]
next_states = [SARS.next_state for SARS in SARSes]
all_states = states + next_states
all_states_tensors = self.state_encoder(all_states)

all_states_tensors = self.state_encoder(all_states, verbose=True)
states_tensors = tuple(t[: len(states)] for t in all_states_tensors)
next_states_tensors = tuple(t[len(states) :] for t in all_states_tensors)

Expand Down
4 changes: 4 additions & 0 deletions recommender/engines/rl/ml_components/sarses_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import itertools
import time
from typing import List, Union
from copy import deepcopy
import billiard
import torch
from mongoengine import DoesNotExist
Expand Down Expand Up @@ -252,6 +253,7 @@ def skipable_func(*args, **kwargs):
def generate_sars(recommendation, root_uas):
"""Generate sars for given recommendation and root user actions"""

rec = deepcopy(recommendation) # elastic_services memory leak
user = recommendation.user or _get_empty_user()

# Create reward
Expand All @@ -266,6 +268,7 @@ def generate_sars(recommendation, root_uas):
state = State(
user=user,
services_history=services_history_before,
elastic_services=rec.elastic_services,
search_data=recommendation.search_data,
).save()

Expand All @@ -283,6 +286,7 @@ def generate_sars(recommendation, root_uas):
next_state = State(
user=user,
services_history=services_history_after,
elastic_services=rec.elastic_services,
search_data=next_recommendation.search_data,
).save()

Expand Down
57 changes: 0 additions & 57 deletions recommender/engines/rl/ml_components/search_data_encoder.py

This file was deleted.

84 changes: 84 additions & 0 deletions recommender/engines/rl/ml_components/service_encoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# pylint: disable=missing-module-docstring, no-member, too-few-public-methods, invalid-name, line-too-long
from typing import List
from copy import deepcopy
import torch
from tqdm import tqdm
from recommender.engines.rl.utils import (
create_index_id_map,
get_service_indices,
)
from recommender.models import User, Service, State
from recommender.services.fts import retrieve_forbidden_services
from recommender.engines.rl.ml_components.services_history_generator import (
get_ordered_services,
)
from recommender.errors import SizeOfUsersAndElasticServicesError
from logger_config import get_logger

logger = get_logger(__name__)


class ServiceEncoder:
"""Encodes search data and user's ordered services into a binary mask"""

def __init__(self):
all_services = list(Service.objects.order_by("id"))
self.I = len(all_services)
self.index_id_map = create_index_id_map(all_services)

forbidden_services = retrieve_forbidden_services()
self.forbidden_service_indices = get_service_indices(
self.index_id_map, forbidden_services.distinct("id")
)

def __call__(
self, users: List[User], states: List[State], verbose=False
) -> torch.Tensor:
batch_size = len(users)
states_len = len(states)

if batch_size != states_len:
# Each state has elastic_services
raise SizeOfUsersAndElasticServicesError

mask = torch.zeros(batch_size, self.I)

if verbose:
logger.info("Creating mask...")

for i in tqdm(range(batch_size), desc="States", disable=not verbose):
state = states.pop(0)

if states_len == 1:
# Model evaluation step - searchdata there is not saved, so it cannot be accessed
# This avoids mongodb ValidationError
state.search_data = None

# Just reading the tuple of references (which is the elastic_services)
# causes the IDs to be replaced with entire service objects in all referenced tuple.
# To avoid memory leak:
# - state has 3 references - direct manipulation on state object will result in memory leak
# to avoid that, there is deepcopy made
state = deepcopy(state)
elastic_services = state.elastic_services
services_context = self.get_id_from_services(elastic_services)

ordered_services = get_ordered_services(users[i])

services_context_indices = get_service_indices(
self.index_id_map, services_context
)
ordered_service_indices = get_service_indices(
self.index_id_map, [s.id for s in ordered_services]
)

mask[i, services_context_indices] = 1
mask[i, ordered_service_indices] = 0
mask[i, self.forbidden_service_indices] = 0

return mask

@staticmethod
def get_id_from_services(services: List[Service]) -> List[int]:
"""Get services IDs"""
return [service.id for service in services]
Loading

0 comments on commit a77d21d

Please sign in to comment.