diff --git a/autocrit/inference/__init__.py b/autocrit/inference/__init__.py index e69de29..72154a0 100644 --- a/autocrit/inference/__init__.py +++ b/autocrit/inference/__init__.py @@ -0,0 +1 @@ +from .inference_hook import vLLMHook, HuggingFaceHook, RewardHook, OpenAIHook diff --git a/autocrit/inference/inference_hook.py b/autocrit/inference/inference_hook.py index 00197c5..09c89fd 100644 --- a/autocrit/inference/inference_hook.py +++ b/autocrit/inference/inference_hook.py @@ -1,193 +1,452 @@ +import math import argparse +import asyncio +import json +import logging import os +import signal +import subprocess +import sys +import time +from abc import ABC, abstractmethod +from typing import Any, Dict, List, Optional, Tuple +from tqdm import tqdm +import openai + +import aiohttp import torch import transformers -from typing import Tuple, Any, Optional, List, Dict import tritonclient.grpc.aio as grpcclient -from autocrit.inference.utils import triton_call, best_of_n +from autocrit.inference.utils import best_of_n, triton_call from text_generation import Client -import logging - -''' -We're using inference hooks rather than directly using hugging face generate incase we want to switch to a triton client at some point. -This gives us significantly improved flexibility, as autocrit is not built around a single inference API -''' - - -# Inference hook that takes a model and a batch of inputs and returns a batch of outputs -class InferenceHook: - def __init__(self, dir : str): - self.dir = dir - self.API_KEY = "" - self.API_URL = "" - - def load(self, **kwargs): +from tqdm.asyncio import tqdm_asyncio + + +class InferenceHook(ABC): + def __init__(self, **kwargs): + """ + Args: + kwargs (`Dict[str, Any]`): a dictionary of parameters to initilize the model with + """ pass - # Calls the inference API and returns the result - def infer(self, input_texts : List[str], - generate_params : Dict[str, Any], - **kwargs: Any) -> Any: + @abstractmethod + def generate(self, prompts: List[str], **kwargs: Dict[str, Any]) -> List[Dict[str, Any]]: """ - input_texts: a list of strings, each string is a prompt - generate_params: a dictionary of parameters to pass to the generate function - kwargs: any additional arguments to pass to the generate function + Args: + prompts (`List[str]`): inputs for generations + kwargs (`Dict[str, Any]`): parameters to control generation + + Returns: + outputs (`List[Dict[str, Any]]`): a list of dictionaries, each dictionary contains the following keys: + id (`int`): the id of the prompt + prompt (`str`): the prompt + outputs (`List[str]`): a list of outputs per prompt """ pass + @abstractmethod + def free(self): + """ + Clean up resources after the inference + """ + pass + def __del__(self): + self.free() -# Inference hook that uses the HuggingFace API to call a model -class HuggingFaceHook(InferenceHook): - def __init__(self, dir : str, tokenizer_name : Optional[str] = None): +class vLLMHook(InferenceHook): + def __init__(self, model_path, tensor_parallel_size=1, num_external_nodes=0): """ - dir: the directory of the model - tokenizer_name: the name of the tokenizer to use, if None, use the model name + Starts data parallel vLLM servers either locally or on separate nodes by spawning slurm jobs + + Args: + model_path (`str`): the path to the model + tensor_parallel_size (`int`): the number of GPUs to use per one server + num_external_nodes (`int`): spawn this many slurm jobs for the servers, if `0`, use only local resourses """ - super().__init__(dir) - self.model_name = dir - if tokenizer_name is None: - self.tokenizer_name = dir + self.init_time = time.time() + self.model_path = model_path + self.tensor_parallel_size = tensor_parallel_size + self.num_external_nodes = num_external_nodes + self.nth_request = 0 + + devices = list(map(str, range(torch.cuda.device_count()))) + devices = [",".join(devices[i*tensor_parallel_size:(i+1)*tensor_parallel_size]) for i in range(len(devices) // tensor_parallel_size)] + + if num_external_nodes: + self.job_ids = [] + self.servers = [] + self.data_parallel_size = torch.cuda.device_count() * num_external_nodes // tensor_parallel_size + + sbatch_script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "vllm.sbatch") + for _ in range(num_external_nodes): + cmd = f"sbatch {sbatch_script_path} NUM_TP={tensor_parallel_size} MODEL_PATH={model_path} DEVICES={'|'.join(devices)}" + process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, env={**os.environ, "TORCHELASTIC_USE_AGENT_STORE": ""}) + + while True: + output = process.stdout.readline().decode("utf-8").strip() + if output == '' and process.poll() is not None: + break + if output: + print(output) + if output.startswith("Submitted batch job"): + self.job_ids.append(output.split()[-1].strip()) + + while not os.path.exists(f"vllm_logs/{self.job_ids[-1]}"): + time.sleep(1) + + with open(f"vllm_logs/{self.job_ids[-1]}") as log: + while True: + output = log.readline().strip() + if output: + print(output) + if output.startswith("HOSTNAME="): + hostname = output.split("=")[-1].strip() + self.servers.extend([f"{hostname}:{8000+i}" for i in range(8 // tensor_parallel_size)]) + break + else: - self.tokenizer_name = tokenizer_name + self.data_parallel_size = torch.cuda.device_count() // tensor_parallel_size + self.servers = [f"localhost:{8000+i}" for i in range(self.data_parallel_size)] + self.processes = [] + for i in range(self.data_parallel_size): + cmd = f"python -m vllm.entrypoints.api_server -tp={tensor_parallel_size} --model={model_path} --port {8000+i}" + kwargs = {"env": {**os.environ, "CUDA_VISIBLE_DEVICES": devices[i], "TORCHELASTIC_USE_AGENT_STORE": ""}} + if not os.environ.get("DEBUG", False): + kwargs["stdout"] = subprocess.DEVNULL + kwargs["stderr"] = subprocess.DEVNULL + + process = subprocess.Popen(cmd.split(), **kwargs) + self.processes.append(process) + + print(f"Loading {self.data_parallel_size} processes for {model_path}...") + + not_loaded = list(self.servers) + while not_loaded: + for server in not_loaded: + try: + asyncio.run(self.request_vllm_api(server=server, prompt=".", max_new_tokens=1)) + not_loaded.remove(server) + except aiohttp.client_exceptions.ClientConnectorError: + break + + time.sleep(1) + + self.load_time = time.time() - self.init_time + print(f"Loaded {model_path} in {self.load_time:.0f}s") + + async def request_vllm_api(self, prompt: str, i=0, num_return_sequences=1, temperature=1.0, max_new_tokens=512, stop=[], server=None, **kwargs): + pload = { + "prompt": prompt, + "n": num_return_sequences, + "temperature": temperature, + "max_tokens": max_new_tokens, + "stop": stop, + "stream": False, + } + + if server is None: + server = self.servers[self.nth_request % self.data_parallel_size] + self.nth_request += 1 + + connector = aiohttp.TCPConnector(limit_per_host=32768) + timeout = aiohttp.ClientTimeout(total=3600) + async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: + async with session.post(f"http://{server}/generate", json=pload) as response: + try: + data = await response.json() + return {"id": i, "prompt": prompt, "outputs": [x[len(prompt):] for x in data["text"]]} + except aiohttp.client.ContentTypeError: + return {"id": i, "prompt": prompt, "outputs": [None]} + + + def generate(self, prompts: List[str], **kwargs) -> List[Dict[str, Any]]: + async def generate_vllm_api(prompts, **kwargs): + outputs = [self.request_vllm_api(prompt=prompt, i=i, **kwargs) for i, prompt in enumerate(prompts)] + return await tqdm_asyncio.gather(*outputs, desc=f"Inferencing {self.model_path}") + + batch_size = 16384 + outputs = [] + for i in range(0, len(prompts), batch_size): + outputs += asyncio.run(generate_vllm_api(prompts[i:i+batch_size], **kwargs)) + + return outputs + + def free(self): + if self.num_external_nodes: + if self.job_ids: + subprocess.run(f"scancel {' '.join(self.job_ids)}".split()) + self.job_ids = [] + else: + for p in self.processes: + os.kill(p.pid, signal.SIGTERM) + p.communicate() + print(f"Unloaded all {self.model_path} processes") + self.processes = [] + +def generate_openai(prompt, model_path="gpt-3.5-turbo", max_new_tokens=128, system_prompt="", temperature=1, stop=[], num_return_sequences=1, **kwargs): + MAX_API_RETRY = 5 + for _ in range(MAX_API_RETRY): + try: + if model_path not in ['gpt-3.5-turbo', 'gpt-4']: + kwargs = {"deployment_id": model_path} + else: + kwargs = {"model": model_path, "stop": stop} + + response = openai.ChatCompletion.create( + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": prompt}, + ], + temperature=temperature, + max_tokens=max_new_tokens, + n=num_return_sequences, + **kwargs, + ) + + return [response["choices"][ix]["message"]["content"] for ix in range(num_return_sequences)] + + except Exception as e: + print(e) + time.sleep(10) + + return [None] + +class OpenAIHook(InferenceHook): + def __init__(self, model_path: str): + self.model_path = model_path + + if not os.environ.get("OPENAI_API_KEY") and openai.api_key is None: + raise RuntimeError("OPENAI_API_KEY is not set") + + def generate(self, prompts: List[str], **kwargs) -> List[str]: + outputs = [] + for prompt in tqdm(prompts, desc=f"Inferencing {self.model_path}"): + outputs.append(generate_openai(prompt, model_path=self.model_path, **kwargs)) + + return [{"id": i, "prompt": prompt, "outputs": outputs[i]} for i, prompt in enumerate(prompts)] + + def free(self): + pass - self.model = None - self.tokenizer = None +class RewardHook(InferenceHook): + def __init__(self, model_path: str): + self.init_time = time.time() + self.model_path = model_path + self.nth_request = 0 + + tensor_parallel_size = 1 + devices = list(map(str, range(torch.cuda.device_count()))) + devices = [",".join(devices[i*tensor_parallel_size:(i+1)*tensor_parallel_size]) for i in range(len(devices) // tensor_parallel_size)] + + self.data_parallel_size = torch.cuda.device_count() // tensor_parallel_size + self.servers = [f"localhost:{8000+i}" for i in range(self.data_parallel_size)] + self.processes = [] + for i in range(self.data_parallel_size): + server_script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "server.py") + cmd = f"python {server_script_path} --model {model_path} --port {8000+i}" + kwargs = {"env": {**os.environ, "CUDA_VISIBLE_DEVICES": devices[i], "TORCHELASTIC_USE_AGENT_STORE": ""}} + if not os.environ.get("DEBUG", False): + kwargs["stdout"] = subprocess.DEVNULL + kwargs["stderr"] = subprocess.DEVNULL + + process = subprocess.Popen(cmd.split(), **kwargs) + self.processes.append(process) + + print(f"Loading {self.data_parallel_size} processes for {model_path}...") + not_loaded = list(self.servers) + while not_loaded: + for server in not_loaded: + try: + asyncio.run(self.request_api(server=server, samples=None)) + not_loaded.remove(server) + except aiohttp.client_exceptions.ClientConnectorError: + break + + time.sleep(1) + + async def request_api(self, samples: List[str], server=None, batch_size=8, max_length=2048): + pload = { + "samples": samples, + "batch_size": batch_size, + "max_length": max_length, + } + + if server is None: + server = self.servers[self.nth_request % self.data_parallel_size] + self.nth_request += 1 + + connector = aiohttp.TCPConnector(limit_per_host=32768) + timeout = aiohttp.ClientTimeout(total=3600) + async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: + async with session.post(f"http://{server}/score", json=pload) as response: + try: + data = await response.json() + return data + except aiohttp.client.ContentTypeError: + return None + + def score(self, samples: List[str], **kwargs) -> List[Dict[str, Any]]: + async def score_async_api(samples, **kwargs): + batch_size = math.ceil(len(samples) / self.data_parallel_size) + samples_batched = [samples[i*batch_size:(i+1)*batch_size] for i in range(self.data_parallel_size)] + outputs = [self.request_api(samples_batched[i], **kwargs) for i in range(self.data_parallel_size)] + + return await tqdm_asyncio.gather(*outputs, desc=f"Inferencing {self.model_path}") + + return sum(asyncio.run(score_async_api(samples, **kwargs)), []) + + def generate(self, prompts: List[str], **kwargs) -> List[Dict[str, Any]]: + return self.score(prompts, **kwargs) + + def free(self): + for p in self.processes: + os.kill(p.pid, signal.SIGTERM) + p.communicate() + print(f"Unloaded all {self.model_path} processes") + self.processes = [] - def load(self, **kwargs): - # Load the model and tokenizer - self.model = transformers.AutoModelForCausalLM.from_pretrained(self.model_name) - self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.tokenizer_name) - # check if there is a padding token, if not add one - if self.tokenizer.pad_token is None: - self.tokenizer.add_special_tokens({"pad_token": "[PAD]"}) +class HuggingFaceHook(InferenceHook): + """ + Inference hook that uses plain HuggingFace transformers API + """ - def infer(self, input_texts : List[str], - generate_params : Dict[str, Any], - **kwargs: Any) -> Any: + def __init__(self, model_path: str, tokenizer_path : Optional[str] = None): """ - input_texts: a list of strings, each string is a prompt - generate_params: a dictionary of parameters to pass to the generate function - kwargs: any additional arguments to pass to the generate function - returns: a list of strings, each string is a generated output + Args: + model_path (`str`): the directory of the model + tokenizer_path (`str`): the directory of the tokenizer, if None, the `model_path` is implied """ + self.model = transformers.AutoModelForCausalLM.from_pretrained(model_path, device_map="auto") + self.tokenizer = transformers.AutoTokenizer.from_pretrained(tokenizer_path or model_path) + self.tokenizer.padding_side = "left" + self.tokenizer.truncation_side = "left" + if self.tokenizer.pad_token is None: + self.tokenizer.add_special_tokens({"pad_token": "[PAD]"}) - # model.generate, use the tokenizer to convert the output to text and kwds for gen arguments - # assume input and output are batched - inp_text = input_texts - inps = self.tokenizer(inp_text, return_tensors="pt", padding=True).to(self.model.device) + @torch.inference_mode() + def generate(self, prompts: List[str], **kwargs: Any) -> List[str]: + stop = kwargs.pop("stop", []) + max_length = kwargs.get("max_length", 2048) + max_new_tokens = kwargs.get("max_new_tokens", 512) + temperature = kwargs.get("temperature", 1.0) + num_return_sequences = kwargs.get("num_return_sequences", 1) - output_txt = self.model.generate(input_ids=inps.input_ids, attention_mask=inps.attention_mask, **generate_params) + outputs = [] + for i in range(len(prompts)): + inputs = self.tokenizer([prompts[i]], return_tensors="pt", padding=True, truncation=True, max_length=max_length).to(self.model.device) - # if we need to decode the text - if not "no_decode" in kwargs: - output_txt = self.tokenizer.batch_decode(output_txt, skip_special_tokens=True) + all_ids = self.model.generate(**inputs, pad_token_id=self.tokenizer.pad_token_id, eos_token_id=self.tokenizer.eos_token_id, max_new_tokens=max_new_tokens, do_sample=temperature > 0, temperature=temperature, num_return_sequences=num_return_sequences) + output_ids = all_ids[:, inputs.input_ids.shape[1]:] + outputs.extend(output_ids) - return output_txt + outputs = self.tokenizer.batch_decode(outputs, skip_special_tokens=True) + + for i in range(len(outputs)): + for s in stop: + if s in outputs[i]: + outputs[i] = outputs[i][:outputs[i].index(s)] + + outputs = [{"id": i, "prompt": p, "outputs": outputs[i*num_return_sequences:(i+1)*num_return_sequences]} for i, p in enumerate(prompts)] + + return outputs + + def free(self): + del self.model + del self.tokenizer -# Inference hook that uses the HuggingFace API to call a model. Uses the best of N sampling method class HuggingFaceHookBestOfN(HuggingFaceHook): - def __init__(self, dir : str, tokenizer_name : Optional[str] = None): + """ + Inference hook that uses the HuggingFace API to call a model. Uses the best of N sampling method + """ + + def __init__(self, model_path: str, tokenizer_path : Optional[str] = None): """ - dir: the directory of the model - tokenizer_name: the name of the tokenizer to use, if None, use the model name + Args: + model_path (`str`): the directory of the model + tokenizer_path (`str`): the directory of the tokenizer, if None, the `model_path` is implied """ - super().__init__(dir, tokenizer_name) + super().__init__(model_path, tokenizer_path) - def infer(self, input_texts : List[str], - generate_params : Dict[str, Any], - **kwargs: Any) -> Any: + def generate(self, prompts: List[str], **kwargs: Any) -> List[str]: """ - input_texts: a list of strings, each string is a prompt - generate_params: a dictionary of parameters to pass to the generate function - kwargs: any additional arguments to pass to the generate function - returns: a list of strings, each string is a generated output + Args: + prompts: a list of prompts to generate + kwargs: a dictionary of parameters to pass to the generate function """ - output_txt = best_of_n(self.model, self.tokenizer, input_texts, gen_kwargs=generate_params, **kwargs) + output_txt = best_of_n(self.model, self.tokenizer, prompts, gen_kwargs=kwargs) return output_txt class TritonHook(InferenceHook): - def __init__(self, dir : str, model_name : str, tokenizer_name : Optional[str] = None): + def __init__(self, url: str, model_path: str, tokenizer_path : Optional[str] = None): """ - dir: location of the triton server - model_name: the name of the model to use - tokenizer_name: the name of the tokenizer to use, if None, use the model name + Args: + url (`str`): location of the triton server + model_path (`str`): the name of the model to use + tokenizer_path (`str`): the name of the tokenizer to use, if None, the `model_path` is implied """ - super().__init__(dir) - self.url = dir # url contains host:port + self.url = url # url contains host:port # TODO: if URL is a path to a triton model, we shold load the model and launch the server - self.model_name = model_name - if tokenizer_name is None: - self.tokenizer_name = model_name - else: - self.tokenizer_name = tokenizer_name - - self.client = None - self.tokenizer = None + self.model_path = model_path + if tokenizer_path is None: + self.tokenizer_path = model_path + else: + self.tokenizer_path = tokenizer_path - def load(self, **kwargs): # create a client using url self.client = grpcclient.InferenceServerClient(url=self.url) - self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.tokenizer_name) + self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.tokenizer_path) # check if there is a padding token, if not add one if self.tokenizer.pad_token is None: self.tokenizer.add_special_tokens({"pad_token": "[PAD]"}) - def infer(self, input_texts : List[str], - generate_params : Dict[str, Any], - **kwargs: Any) -> Any: + def generate(self, prompts: List[str], **kwargs: Any) -> List[str]: """ - input_texts: a list of strings, each string is a prompt - generate_params: a dictionary of parameters to pass to the generate function - kwargs: any additional arguments to pass to the generate function - returns: a list of strings, each string is a generated output + Args: + prompts: a list of prompts to generate + kwargs: a dictionary of parameters to pass to the generate function """ # use self.client to call the model # assume input and output are batched - inp_text = input_texts - inps = self.tokenizer(inp_text, return_tensors="pt", padding=True) + inps = self.tokenizer(prompts, return_tensors="pt", padding=True) # call infer - logits, output_txt = triton_call(self.client, self.model_name, inps.input_ids, **generate_params) + logits, output_txt = triton_call(self.client, self.model_path, inps.input_ids, **kwargs) if not "no_decode" in kwargs: output_txt = self.tokenizer.batch_decode(output_txt, skip_special_tokens=True) # check if logits are needed - if generate_params["return_logits"]: + if kwargs["return_logits"]: return output_txt, logits else: return output_txt - -# Inference hook that uses the HuggingFace API to call a model -class TextGenerationHook(InferenceHook): - def __init__(self, dir : str): - super().__init__(dir) - self.model_name = dir - self.client = None - def load(self, **kwargs): +class TextGenerationHook(InferenceHook): + def __init__(self, model_path: str): + self.model_path = model_path # get num shards and port, used for the launcher script num_shards = kwargs.get("num_shards", 1) port = kwargs.get("port", 8080) # check if model name is a URL - if not self.model_name.startswith("http"): + if not self.model_path.startswith("http"): # launch the model using the model name and text_generation_launcher.sh # The following line runs the launcher script - output = subprocess.run(["sbatch./launch.sbatch MODEL_NAME="+str(self.model_name) + " NUM_SHARD="+str(num_shards) + " PORT="+str(port)], capture_output=True) + output = subprocess.run(["sbatch./launch.sbatch MODEL_NAME="+str(self.model_path) + " NUM_SHARD="+str(num_shards) + " PORT="+str(port)], capture_output=True) logging.info(output.stdout.decode("utf-8")) # check return code if output.returncode != 0: logging.log(logging.ERROR, output.stderr.decode("utf-8")) raise RuntimeError("Failed to launch model") - else: + else: logging.info("Model launched successfully.") # get the job id from the output @@ -205,22 +464,18 @@ def load(self, **kwargs): # Create the client self.client = Client(f"http://{ip}:{port}") else: - self.client = Client(self.model_name) + self.client = Client(self.model_path) - def infer(self, input_texts : List[str], - generate_params : Dict[str, Any], - **kwargs: Any) -> Any: + def generate(self, prompts: List[str], **kwargs: Dict[str, Any]) -> List[str]: """ - input_texts: a list of strings, each string is a prompt - generate_params: a dictionary of parameters to pass to the generate function - kwargs: any additional arguments to pass to the generate function - returns: a list of strings, each string is a generated output + Args: + prompts (`List[str]`): a list of strings, each string is a prompt + kwargs (`Dict[str, Any]`): a dictionary of parameters to pass to the generate function """ # if input_texts is a list, convert it to a string if isinstance(input_texts, list): input_texts = input_texts[0] # use self.client to call the model - output_txt = self.client.generate(input_texts, **generate_params).generated_text - #print(output_txt) + output_txt = self.client.generate(input_texts, **kwargs).generated_text return output_txt diff --git a/autocrit/inference/vllm.sbatch b/autocrit/inference/vllm.sbatch new file mode 100644 index 0000000..c0f3d6b --- /dev/null +++ b/autocrit/inference/vllm.sbatch @@ -0,0 +1,48 @@ +#!/bin/bash +#SBATCH --job-name=vllm +#SBATCH --partition=g40 +#SBATCH --account=stability +#SBATCH --nodes=1 +#SBATCH --ntasks-per-node=1 +#SBATCH --mem=0 +#SBATCH --cpus-per-task=64 +#SBATCH --output=vllm_logs/%j +#SBATCH --exclusive + +ray stop + +for ARGUMENT in "$@" +do + KEY=$(echo $ARGUMENT | cut -f1 -d=) + + KEY_LENGTH=${#KEY} + VALUE="${ARGUMENT:$KEY_LENGTH+1}" + + export "$KEY"="$VALUE" +done + +# check if argument contains MODEL, NUMTP +if [ -z "$MODEL_PATH" ] || [ -z "$NUM_TP" ]; then + echo "Please provide MODEL, NUM_TP" + exit 1 +fi + +# replace '|' with ' ' for cuda devices separator to iterate over +export DEVICES=${DEVICES//|/ } +export HOSTNAMES=$(scontrol show hostnames "$SLURM_JOB_NODELIST") + +echo MODEL_PATH=$MODEL_PATH +echo NUM_TP=$NUM_TP +echo DEVICES=$DEVICES +echo HOSTNAME=$HOSTNAMES + +echo $VIRTUAL_ENV +module load cuda/11.8 + +ix=0 +for devices in $DEVICES; do + CUDA_VISIBLE_DEVICES=$devices python -m vllm.entrypoints.api_server -tp=$NUM_TP --model=$MODEL_PATH --host=0.0.0.0 --port $((8000+ix)) & + ix=$((ix+1)) +done + +wait