From 1cbdf8e1c1ef9776f0de776153f75e254f1717c8 Mon Sep 17 00:00:00 2001 From: maxreciprocate <56548574+maxreciprocate@users.noreply.github.com> Date: Wed, 30 Aug 2023 18:18:05 +0300 Subject: [PATCH 01/12] feat(inference): add vLLM inference --- autocrit/inference/__init__.py | 1 + autocrit/inference/inference_hook.py | 95 +++++++++++++++++++++++----- 2 files changed, 80 insertions(+), 16 deletions(-) diff --git a/autocrit/inference/__init__.py b/autocrit/inference/__init__.py index e69de29..487e094 100644 --- a/autocrit/inference/__init__.py +++ b/autocrit/inference/__init__.py @@ -0,0 +1 @@ +from .inference_hook import VLLMHook diff --git a/autocrit/inference/inference_hook.py b/autocrit/inference/inference_hook.py index 00197c5..fcca4ae 100644 --- a/autocrit/inference/inference_hook.py +++ b/autocrit/inference/inference_hook.py @@ -1,3 +1,11 @@ +import aiohttp +import asyncio +import json +import subprocess +import torch +import time +from tqdm.asyncio import tqdm_asyncio + import argparse import os import torch @@ -6,7 +14,7 @@ import tritonclient.grpc.aio as grpcclient from autocrit.inference.utils import triton_call, best_of_n from text_generation import Client -import logging +import logging ''' We're using inference hooks rather than directly using hugging face generate incase we want to switch to a triton client at some point. @@ -20,13 +28,13 @@ def __init__(self, dir : str): self.dir = dir self.API_KEY = "" self.API_URL = "" - + def load(self, **kwargs): pass # Calls the inference API and returns the result - def infer(self, input_texts : List[str], - generate_params : Dict[str, Any], + def infer(self, input_texts : List[str], + generate_params : Dict[str, Any], **kwargs: Any) -> Any: """ input_texts: a list of strings, each string is a prompt @@ -35,7 +43,62 @@ def infer(self, input_texts : List[str], """ pass - +class VLLMHook: + def __init__(self, model_path, tensor_parallel_size=1): + self.data_parallel_size = torch.cuda.device_count() // tensor_parallel_size + self.tensor_parallel_size = tensor_parallel_size + + devices = list(map(str, range(torch.cuda.device_count()))) + devices = [",".join(devices[i:i+tensor_parallel_size]) for i in range(self.data_parallel_size)] + + self.processes = [] + for i in range(self.data_parallel_size): + cmd = f"python -m vllm.entrypoints.api_server -tp={tensor_parallel_size} --model={model_path} --port {8000+i}" + process = subprocess.Popen(cmd.split(), env={**os.environ, "CUDA_VISIBLE_DEVICES": devices[i]}) + self.processes.append(process) + + while True: + all_loaded = True + try: + asyncio.run(self.request_vllm_api(prompt="", port=8000 + self.data_parallel_size-1)) + except aiohttp.client_exceptions.ClientConnectorError: + all_loaded = False + + if all_loaded: + break + + time.sleep(1) + + async def request_vllm_api(self, prompt: str, i = 0, port=8000, n=1, temperature=0.0, max_new_tokens=512, stop=[]): + pload = { + "prompt": prompt, + "n": n, + "temperature": temperature, + "max_tokens": max_new_tokens, + "stop": stop, + "stream": False, + } + + connector = aiohttp.TCPConnector(limit_per_host=1024) + timeout = aiohttp.ClientTimeout(total=9000) + async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: + async with session.post(f"http://localhost:{port}/generate", json=pload) as response: + try: + data = await response.json() + return {"id": i, "prompt": prompt, "output": [x[len(prompt):] for x in data["text"]]} + except aiohttp.client.ContentTypeError: + return {"id": i, "prompt": prompt, "output": [None]} + + def generate(self, prompts, **kwargs): + async def generate_vllm_api(prompts, **kwargs): + outputs = [self.request_vllm_api(prompt, i=i, **kwargs, port=8000 + i % self.data_parallel_size) for i, prompt in enumerate(prompts)] + return await tqdm_asyncio.gather(*outputs) + + return asyncio.run(generate_vllm_api(prompts, **kwargs)) + + def __del__(self): + for p in self.processes: + p.kill() # Inference hook that uses the HuggingFace API to call a model @@ -64,8 +127,8 @@ def load(self, **kwargs): if self.tokenizer.pad_token is None: self.tokenizer.add_special_tokens({"pad_token": "[PAD]"}) - def infer(self, input_texts : List[str], - generate_params : Dict[str, Any], + def infer(self, input_texts : List[str], + generate_params : Dict[str, Any], **kwargs: Any) -> Any: """ input_texts: a list of strings, each string is a prompt @@ -96,8 +159,8 @@ def __init__(self, dir : str, tokenizer_name : Optional[str] = None): """ super().__init__(dir, tokenizer_name) - def infer(self, input_texts : List[str], - generate_params : Dict[str, Any], + def infer(self, input_texts : List[str], + generate_params : Dict[str, Any], **kwargs: Any) -> Any: """ input_texts: a list of strings, each string is a prompt @@ -123,7 +186,7 @@ def __init__(self, dir : str, model_name : str, tokenizer_name : Optional[str] = self.model_name = model_name if tokenizer_name is None: self.tokenizer_name = model_name - else: + else: self.tokenizer_name = tokenizer_name self.client = None @@ -138,8 +201,8 @@ def load(self, **kwargs): self.tokenizer.add_special_tokens({"pad_token": "[PAD]"}) - def infer(self, input_texts : List[str], - generate_params : Dict[str, Any], + def infer(self, input_texts : List[str], + generate_params : Dict[str, Any], **kwargs: Any) -> Any: """ input_texts: a list of strings, each string is a prompt @@ -163,7 +226,7 @@ def infer(self, input_texts : List[str], return output_txt, logits else: return output_txt - + # Inference hook that uses the HuggingFace API to call a model class TextGenerationHook(InferenceHook): @@ -187,7 +250,7 @@ def load(self, **kwargs): if output.returncode != 0: logging.log(logging.ERROR, output.stderr.decode("utf-8")) raise RuntimeError("Failed to launch model") - else: + else: logging.info("Model launched successfully.") # get the job id from the output @@ -207,8 +270,8 @@ def load(self, **kwargs): else: self.client = Client(self.model_name) - def infer(self, input_texts : List[str], - generate_params : Dict[str, Any], + def infer(self, input_texts : List[str], + generate_params : Dict[str, Any], **kwargs: Any) -> Any: """ input_texts: a list of strings, each string is a prompt From 4ead7a175d2ccb58f33f2a4429c827e728fb52ba Mon Sep 17 00:00:00 2001 From: maxreciprocate <56548574+maxreciprocate@users.noreply.github.com> Date: Wed, 30 Aug 2023 18:25:30 +0300 Subject: [PATCH 02/12] refactor(inference): consistent vllm naming --- autocrit/inference/__init__.py | 2 +- autocrit/inference/inference_hook.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/autocrit/inference/__init__.py b/autocrit/inference/__init__.py index 487e094..130c962 100644 --- a/autocrit/inference/__init__.py +++ b/autocrit/inference/__init__.py @@ -1 +1 @@ -from .inference_hook import VLLMHook +from .inference_hook import vLLMHook diff --git a/autocrit/inference/inference_hook.py b/autocrit/inference/inference_hook.py index fcca4ae..fd14c8a 100644 --- a/autocrit/inference/inference_hook.py +++ b/autocrit/inference/inference_hook.py @@ -43,7 +43,7 @@ def infer(self, input_texts : List[str], """ pass -class VLLMHook: +class vLLMHook: def __init__(self, model_path, tensor_parallel_size=1): self.data_parallel_size = torch.cuda.device_count() // tensor_parallel_size self.tensor_parallel_size = tensor_parallel_size From b0fefdbdfda9cc4c5d8965592d2fac2497125963 Mon Sep 17 00:00:00 2001 From: maxreciprocate <56548574+maxreciprocate@users.noreply.github.com> Date: Thu, 7 Sep 2023 19:55:59 +0300 Subject: [PATCH 03/12] fix(inference): proper unloading of vllm servers --- autocrit/inference/__init__.py | 2 +- autocrit/inference/inference_hook.py | 121 +++++++++++++-------------- 2 files changed, 59 insertions(+), 64 deletions(-) diff --git a/autocrit/inference/__init__.py b/autocrit/inference/__init__.py index 130c962..93f0ae3 100644 --- a/autocrit/inference/__init__.py +++ b/autocrit/inference/__init__.py @@ -1 +1 @@ -from .inference_hook import vLLMHook +from .inference_hook import vLLMHook, HuggingFaceHook diff --git a/autocrit/inference/inference_hook.py b/autocrit/inference/inference_hook.py index fd14c8a..daf2c7b 100644 --- a/autocrit/inference/inference_hook.py +++ b/autocrit/inference/inference_hook.py @@ -1,3 +1,4 @@ +import signal import aiohttp import asyncio import json @@ -6,6 +7,7 @@ import time from tqdm.asyncio import tqdm_asyncio +from abc import ABC, abstractmethod import argparse import os import torch @@ -22,54 +24,58 @@ ''' -# Inference hook that takes a model and a batch of inputs and returns a batch of outputs -class InferenceHook: - def __init__(self, dir : str): - self.dir = dir - self.API_KEY = "" - self.API_URL = "" - - def load(self, **kwargs): +class InferenceHook(ABC): + def __init__(self, **kwargs): + """ + kwargs: a dictionary of parameters to pass to initilize the model + """ pass - # Calls the inference API and returns the result - def infer(self, input_texts : List[str], - generate_params : Dict[str, Any], - **kwargs: Any) -> Any: + @abstractmethod + def generate(self, prompts: List[str], **kwargs: Dict[str, Any]): """ - input_texts: a list of strings, each string is a prompt - generate_params: a dictionary of parameters to pass to the generate function - kwargs: any additional arguments to pass to the generate function + prompts: a list of strings, each string is a prompt + kwargs: a dictionary of parameters to pass to the generate function """ pass -class vLLMHook: + @abstractmethod + def unload(self): + pass + +class vLLMHook(InferenceHook): def __init__(self, model_path, tensor_parallel_size=1): - self.data_parallel_size = torch.cuda.device_count() // tensor_parallel_size + self.model_path = model_path self.tensor_parallel_size = tensor_parallel_size + self.data_parallel_size = torch.cuda.device_count() // tensor_parallel_size + self.nth_request = 0 devices = list(map(str, range(torch.cuda.device_count()))) - devices = [",".join(devices[i:i+tensor_parallel_size]) for i in range(self.data_parallel_size)] + devices = [",".join(devices[i*tensor_parallel_size:(i+1)*tensor_parallel_size]) for i in range(self.data_parallel_size)] self.processes = [] for i in range(self.data_parallel_size): cmd = f"python -m vllm.entrypoints.api_server -tp={tensor_parallel_size} --model={model_path} --port {8000+i}" - process = subprocess.Popen(cmd.split(), env={**os.environ, "CUDA_VISIBLE_DEVICES": devices[i]}) + process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env={**os.environ, "CUDA_VISIBLE_DEVICES": devices[i]}) self.processes.append(process) + print(f"Loading {self.data_parallel_size} processes for {model_path}...") + while True: all_loaded = True try: - asyncio.run(self.request_vllm_api(prompt="", port=8000 + self.data_parallel_size-1)) + asyncio.run(self.request_vllm_api(prompt="")) except aiohttp.client_exceptions.ClientConnectorError: all_loaded = False if all_loaded: + print(f"Loaded {model_path}") + time.sleep(5) break time.sleep(1) - async def request_vllm_api(self, prompt: str, i = 0, port=8000, n=1, temperature=0.0, max_new_tokens=512, stop=[]): + async def request_vllm_api(self, prompt: str, i=0, n=1, temperature=0.0, max_new_tokens=512, stop=[]): pload = { "prompt": prompt, "n": n, @@ -79,6 +85,8 @@ async def request_vllm_api(self, prompt: str, i = 0, port=8000, n=1, temperature "stream": False, } + port = 8000 + self.nth_request % self.data_parallel_size + self.nth_request += 1 connector = aiohttp.TCPConnector(limit_per_host=1024) timeout = aiohttp.ClientTimeout(total=9000) async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: @@ -89,66 +97,53 @@ async def request_vllm_api(self, prompt: str, i = 0, port=8000, n=1, temperature except aiohttp.client.ContentTypeError: return {"id": i, "prompt": prompt, "output": [None]} + def generate(self, prompts, **kwargs): async def generate_vllm_api(prompts, **kwargs): - outputs = [self.request_vllm_api(prompt, i=i, **kwargs, port=8000 + i % self.data_parallel_size) for i, prompt in enumerate(prompts)] - return await tqdm_asyncio.gather(*outputs) + outputs = [self.request_vllm_api(prompt, i=i, **kwargs) for i, prompt in enumerate(prompts)] + return await tqdm_asyncio.gather(*outputs, desc=f"Inferencing {self.model_path}") return asyncio.run(generate_vllm_api(prompts, **kwargs)) - def __del__(self): + def unload(self): for p in self.processes: - p.kill() + os.kill(p.pid, signal.SIGKILL) + p.communicate() + print(f"Offloaded all {self.model_path} processes") # Inference hook that uses the HuggingFace API to call a model class HuggingFaceHook(InferenceHook): - def __init__(self, dir : str, tokenizer_name : Optional[str] = None): - """ - dir: the directory of the model - tokenizer_name: the name of the tokenizer to use, if None, use the model name - """ - super().__init__(dir) - self.model_name = dir - if tokenizer_name is None: - self.tokenizer_name = dir - else: - self.tokenizer_name = tokenizer_name + def __init__(self, model_path: str, tokenizer_path : Optional[str] = None): + self.model = transformers.AutoModelForCausalLM.from_pretrained(model_path, device_map="auto") + self.tokenizer = transformers.AutoTokenizer.from_pretrained(tokenizer_path or model_path) + if self.tokenizer.pad_token is None: + self.tokenizer.add_special_tokens({"pad_token": "[PAD]"}) - self.model = None - self.tokenizer = None + @torch.inference_mode() + def generate(self, prompts: List[str], **kwargs: Any) -> List[str]: + stop = kwargs.pop("stop", []) - def load(self, **kwargs): - # Load the model and tokenizer - self.model = transformers.AutoModelForCausalLM.from_pretrained(self.model_name) - self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.tokenizer_name) + inputs = self.tokenizer(prompts, return_tensors="pt", padding=True, truncation=True, max_length=kwargs.get("max_length", 2048)).to(self.model.device) - # check if there is a padding token, if not add one - if self.tokenizer.pad_token is None: - self.tokenizer.add_special_tokens({"pad_token": "[PAD]"}) + all_ids = self.model.generate(**inputs, pad_token_id=self.tokenizer.pad_token_id, eos_token_id=self.tokenizer.eos_token_id, **kwargs) + output_ids = all_ids[:, inputs.input_ids.shape[1]:] - def infer(self, input_texts : List[str], - generate_params : Dict[str, Any], - **kwargs: Any) -> Any: - """ - input_texts: a list of strings, each string is a prompt - generate_params: a dictionary of parameters to pass to the generate function - kwargs: any additional arguments to pass to the generate function - returns: a list of strings, each string is a generated output - """ + if "no_decode" in kwargs: + return output - # model.generate, use the tokenizer to convert the output to text and kwds for gen arguments - # assume input and output are batched - inp_text = input_texts - inps = self.tokenizer(inp_text, return_tensors="pt", padding=True).to(self.model.device) + outputs = self.tokenizer.batch_decode(output_ids, skip_special_tokens=True) - output_txt = self.model.generate(input_ids=inps.input_ids, attention_mask=inps.attention_mask, **generate_params) + for i in range(len(outputs)): + for s in stop: + if s in outputs[i]: + outputs[i] = outputs[i][:outputs[i].index(s)] - # if we need to decode the text - if not "no_decode" in kwargs: - output_txt = self.tokenizer.batch_decode(output_txt, skip_special_tokens=True) + return outputs - return output_txt + def unload(self): + del self.model + del self.tokenizer # Inference hook that uses the HuggingFace API to call a model. Uses the best of N sampling method class HuggingFaceHookBestOfN(HuggingFaceHook): From 0d8e17aeb36856aa7533bce320f04f1b1e96d49c Mon Sep 17 00:00:00 2001 From: maxreciprocate <56548574+maxreciprocate@users.noreply.github.com> Date: Wed, 13 Sep 2023 00:12:29 +0300 Subject: [PATCH 04/12] feat: add multi-node vllm inference --- autocrit/inference/inference_hook.py | 134 +++++++++++++++++++-------- 1 file changed, 95 insertions(+), 39 deletions(-) diff --git a/autocrit/inference/inference_hook.py b/autocrit/inference/inference_hook.py index daf2c7b..c62e341 100644 --- a/autocrit/inference/inference_hook.py +++ b/autocrit/inference/inference_hook.py @@ -18,99 +18,151 @@ from text_generation import Client import logging -''' -We're using inference hooks rather than directly using hugging face generate incase we want to switch to a triton client at some point. -This gives us significantly improved flexibility, as autocrit is not built around a single inference API -''' +import signal +import sys class InferenceHook(ABC): def __init__(self, **kwargs): """ - kwargs: a dictionary of parameters to pass to initilize the model + kwargs: a dictionary of parameters to initilize the model """ pass @abstractmethod def generate(self, prompts: List[str], **kwargs: Dict[str, Any]): """ - prompts: a list of strings, each string is a prompt - kwargs: a dictionary of parameters to pass to the generate function + prompts: inputs for generations + kwargs: parameters to control generation """ pass @abstractmethod - def unload(self): + def free(self): + """ + Clean up resources after inference + """ pass class vLLMHook(InferenceHook): - def __init__(self, model_path, tensor_parallel_size=1): + def __init__(self, model_path, tensor_parallel_size=1, external=False, num_nodes=1): self.model_path = model_path self.tensor_parallel_size = tensor_parallel_size - self.data_parallel_size = torch.cuda.device_count() // tensor_parallel_size + self.external = external self.nth_request = 0 devices = list(map(str, range(torch.cuda.device_count()))) - devices = [",".join(devices[i*tensor_parallel_size:(i+1)*tensor_parallel_size]) for i in range(self.data_parallel_size)] + devices = [",".join(devices[i*tensor_parallel_size:(i+1)*tensor_parallel_size]) for i in range(len(devices) // tensor_parallel_size)] + + if external: + self.job_ids = [] + self.servers = [] + self.data_parallel_size = torch.cuda.device_count() * num_nodes // tensor_parallel_size + + sbatch_script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "vllm.sbatch") + for _ in range(num_nodes): + cmd = f"sbatch {sbatch_script_path} NUM_TP={tensor_parallel_size} MODEL_PATH={model_path} DEVICES={'.'.join(devices)}" + print(f'{cmd=}') + process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + while True: + output = process.stdout.readline().decode("utf-8").strip() + if output == '' and process.poll() is not None: + break + if output: + print(output) + if output.startswith("Submitted batch job"): + self.job_ids.append(output.split()[-1].strip()) + + while not os.path.exists(f"{self.job_ids[-1]}"): + time.sleep(1) + + with open(f"{self.job_ids[-1]}") as log: + while True: + output = log.readline().strip() + if output: + print(output) + if output.startswith("HOSTNAME="): + hostname = output.split("=")[-1].strip() + self.servers.extend([f"{hostname}:{8000+i}" for i in range(8 // tensor_parallel_size)]) + break - self.processes = [] - for i in range(self.data_parallel_size): - cmd = f"python -m vllm.entrypoints.api_server -tp={tensor_parallel_size} --model={model_path} --port {8000+i}" - process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env={**os.environ, "CUDA_VISIBLE_DEVICES": devices[i]}) - self.processes.append(process) + else: + self.data_parallel_size = torch.cuda.device_count() // tensor_parallel_size + self.servers = [f"localhost:{8000+i}" for i in range(self.data_parallel_size)] + self.processes = [] + for i in range(self.data_parallel_size): + cmd = f"python -m vllm.entrypoints.api_server -tp={tensor_parallel_size} --model={model_path} --port {8000+i}" + process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env={**os.environ, "CUDA_VISIBLE_DEVICES": devices[i], "TORCHELASTIC_USE_AGENT_STORE": ""}) + self.processes.append(process) - print(f"Loading {self.data_parallel_size} processes for {model_path}...") + print(f"Loading {self.data_parallel_size} processes for {model_path}...") while True: all_loaded = True - try: - asyncio.run(self.request_vllm_api(prompt="")) - except aiohttp.client_exceptions.ClientConnectorError: - all_loaded = False + for server in self.servers: + try: + asyncio.run(self.request_vllm_api(server=server, prompt="", max_new_tokens=1)) + except aiohttp.client_exceptions.ClientConnectorError: + all_loaded = False + break if all_loaded: print(f"Loaded {model_path}") - time.sleep(5) + time.sleep(10) break time.sleep(1) - async def request_vllm_api(self, prompt: str, i=0, n=1, temperature=0.0, max_new_tokens=512, stop=[]): + async def request_vllm_api(self, prompt: str, i=0, num_return_sequences=1, temperature=0.0, max_new_tokens=512, stop=[], server=None): pload = { "prompt": prompt, - "n": n, + "n": num_return_sequences, "temperature": temperature, "max_tokens": max_new_tokens, "stop": stop, "stream": False, } - port = 8000 + self.nth_request % self.data_parallel_size - self.nth_request += 1 - connector = aiohttp.TCPConnector(limit_per_host=1024) - timeout = aiohttp.ClientTimeout(total=9000) + if server is None: + server = self.servers[self.nth_request % self.data_parallel_size] + self.nth_request += 1 + + connector = aiohttp.TCPConnector(limit_per_host=8192 * 4) + timeout = aiohttp.ClientTimeout(total=60*60) async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: - async with session.post(f"http://localhost:{port}/generate", json=pload) as response: + async with session.post(f"http://{server}/generate", json=pload) as response: try: data = await response.json() - return {"id": i, "prompt": prompt, "output": [x[len(prompt):] for x in data["text"]]} + return {"id": i, "prompt": prompt, "outputs": [x[len(prompt):] for x in data["text"]]} except aiohttp.client.ContentTypeError: - return {"id": i, "prompt": prompt, "output": [None]} + return {"id": i, "prompt": prompt, "outputs": [None]} def generate(self, prompts, **kwargs): async def generate_vllm_api(prompts, **kwargs): - outputs = [self.request_vllm_api(prompt, i=i, **kwargs) for i, prompt in enumerate(prompts)] + outputs = [self.request_vllm_api(prompt=prompt, i=i, **kwargs) for i, prompt in enumerate(prompts)] return await tqdm_asyncio.gather(*outputs, desc=f"Inferencing {self.model_path}") - return asyncio.run(generate_vllm_api(prompts, **kwargs)) + batch_size = 8192 + outputs = [] + for i in range(0, len(prompts), batch_size): + outputs += asyncio.run(generate_vllm_api(prompts[i:i+batch_size], **kwargs)) - def unload(self): - for p in self.processes: - os.kill(p.pid, signal.SIGKILL) - p.communicate() - print(f"Offloaded all {self.model_path} processes") + return outputs + + def free(self): + if self.external: + subprocess.run(f"scancel {' '.join(self.job_ids)}".split()) + else: + for p in self.processes: + os.kill(p.pid, signal.SIGTERM) + p.communicate() + print(f"Unloaded all {self.model_path} processes") + time.sleep(10) + def __del__(self): + self.free() # Inference hook that uses the HuggingFace API to call a model class HuggingFaceHook(InferenceHook): @@ -134,14 +186,18 @@ def generate(self, prompts: List[str], **kwargs: Any) -> List[str]: outputs = self.tokenizer.batch_decode(output_ids, skip_special_tokens=True) + for i in range(len(outputs)): for s in stop: if s in outputs[i]: outputs[i] = outputs[i][:outputs[i].index(s)] + num_return_sequences = kwargs.get("num_return_sequences", 1) + outputs = [{"id": i, "prompt": p, "outputs": outputs[i*num_return_sequences:(i+1)*num_return_sequences]} for i, p in enumerate(prompts)] + return outputs - def unload(self): + def free(self): del self.model del self.tokenizer From 1b907e74cd3c79f471e706825bbe6acb3e2672f3 Mon Sep 17 00:00:00 2001 From: maxreciprocate <56548574+maxreciprocate@users.noreply.github.com> Date: Thu, 14 Sep 2023 01:24:19 +0300 Subject: [PATCH 05/12] feat(inference): add vllm sbatch script --- autocrit/inference/inference_hook.py | 28 ++++++++--------- autocrit/inference/vllm.sbatch | 47 ++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 15 deletions(-) create mode 100644 autocrit/inference/vllm.sbatch diff --git a/autocrit/inference/inference_hook.py b/autocrit/inference/inference_hook.py index c62e341..a227129 100644 --- a/autocrit/inference/inference_hook.py +++ b/autocrit/inference/inference_hook.py @@ -46,6 +46,7 @@ def free(self): class vLLMHook(InferenceHook): def __init__(self, model_path, tensor_parallel_size=1, external=False, num_nodes=1): + self.init_time = time.time() self.model_path = model_path self.tensor_parallel_size = tensor_parallel_size self.external = external @@ -61,9 +62,9 @@ def __init__(self, model_path, tensor_parallel_size=1, external=False, num_nodes sbatch_script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "vllm.sbatch") for _ in range(num_nodes): - cmd = f"sbatch {sbatch_script_path} NUM_TP={tensor_parallel_size} MODEL_PATH={model_path} DEVICES={'.'.join(devices)}" + cmd = f"sbatch {sbatch_script_path} NUM_TP={tensor_parallel_size} MODEL_PATH={model_path} DEVICES={'|'.join(devices)}" print(f'{cmd=}') - process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env={**os.environ, "TORCHELASTIC_USE_AGENT_STORE": ""}) while True: output = process.stdout.readline().decode("utf-8").strip() @@ -98,22 +99,20 @@ def __init__(self, model_path, tensor_parallel_size=1, external=False, num_nodes print(f"Loading {self.data_parallel_size} processes for {model_path}...") - while True: - all_loaded = True - for server in self.servers: + not_loaded = list(self.servers) + while not_loaded: + for server in not_loaded: try: asyncio.run(self.request_vllm_api(server=server, prompt="", max_new_tokens=1)) + not_loaded.remove(server) except aiohttp.client_exceptions.ClientConnectorError: - all_loaded = False break - if all_loaded: - print(f"Loaded {model_path}") - time.sleep(10) - break - time.sleep(1) + self.load_time = time.time() - self.init_time + print(f"Loaded {model_path} in {self.load_time:.0f}s") + async def request_vllm_api(self, prompt: str, i=0, num_return_sequences=1, temperature=0.0, max_new_tokens=512, stop=[], server=None): pload = { "prompt": prompt, @@ -128,8 +127,8 @@ async def request_vllm_api(self, prompt: str, i=0, num_return_sequences=1, tempe server = self.servers[self.nth_request % self.data_parallel_size] self.nth_request += 1 - connector = aiohttp.TCPConnector(limit_per_host=8192 * 4) - timeout = aiohttp.ClientTimeout(total=60*60) + connector = aiohttp.TCPConnector(limit_per_host=32768) + timeout = aiohttp.ClientTimeout(total=3600) async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: async with session.post(f"http://{server}/generate", json=pload) as response: try: @@ -144,7 +143,7 @@ async def generate_vllm_api(prompts, **kwargs): outputs = [self.request_vllm_api(prompt=prompt, i=i, **kwargs) for i, prompt in enumerate(prompts)] return await tqdm_asyncio.gather(*outputs, desc=f"Inferencing {self.model_path}") - batch_size = 8192 + batch_size = 32768 outputs = [] for i in range(0, len(prompts), batch_size): outputs += asyncio.run(generate_vllm_api(prompts[i:i+batch_size], **kwargs)) @@ -159,7 +158,6 @@ def free(self): os.kill(p.pid, signal.SIGTERM) p.communicate() print(f"Unloaded all {self.model_path} processes") - time.sleep(10) def __del__(self): self.free() diff --git a/autocrit/inference/vllm.sbatch b/autocrit/inference/vllm.sbatch new file mode 100644 index 0000000..71dcddb --- /dev/null +++ b/autocrit/inference/vllm.sbatch @@ -0,0 +1,47 @@ +#!/bin/bash +#SBATCH --job-name=vllm +#SBATCH --partition=g80 +#SBATCH --account=carperai +#SBATCH --nodes=1 +#SBATCH --ntasks-per-node=1 +#SBATCH --mem=0 +#SBATCH --cpus-per-task=64 +#SBATCH --output=%j +#SBATCH --exclusive +#SBATCH --exclude ip-26-0-158-175 + +for ARGUMENT in "$@" +do + KEY=$(echo $ARGUMENT | cut -f1 -d=) + + KEY_LENGTH=${#KEY} + VALUE="${ARGUMENT:$KEY_LENGTH+1}" + + export "$KEY"="$VALUE" +done + +# check if argument contains MODEL, NUMTP +if [ -z "$MODEL_PATH" ] || [ -z "$NUM_TP" ]; then + echo "Please provide MODEL, NUM_TP" + exit 1 +fi + +# replace '|' with ' ' for cuda devices separator to iterate over +export DEVICES=${DEVICES//|/ } +export HOSTNAMES=$(scontrol show hostnames "$SLURM_JOB_NODELIST") + +echo MODEL_PATH=$MODEL_PATH +echo NUM_TP=$NUM_TP +echo DEVICES=$DEVICES +echo HOSTNAME=$HOSTNAMES + +echo $VIRTUAL_ENV +module load cuda/11.8 + +ix=0 +for devices in $DEVICES; do + CUDA_VISIBLE_DEVICES=$devices python -m vllm.entrypoints.api_server -tp=$NUM_TP --model=$MODEL_PATH --host=0.0.0.0 --port $((8000+ix)) & + ix=$((ix+1)) +done + +wait From ce95c2533bf502c127ac4f47ef991c8f6e25f6f9 Mon Sep 17 00:00:00 2001 From: maxreciprocate <56548574+maxreciprocate@users.noreply.github.com> Date: Thu, 14 Sep 2023 18:44:21 +0300 Subject: [PATCH 06/12] refactor(inference): update docs, keep hooks under the same api --- autocrit/inference/inference_hook.py | 176 ++++++++++++++------------- 1 file changed, 94 insertions(+), 82 deletions(-) diff --git a/autocrit/inference/inference_hook.py b/autocrit/inference/inference_hook.py index a227129..ccdc812 100644 --- a/autocrit/inference/inference_hook.py +++ b/autocrit/inference/inference_hook.py @@ -1,51 +1,64 @@ -import signal -import aiohttp +import argparse import asyncio import json +import logging +import os +import signal import subprocess -import torch +import sys import time -from tqdm.asyncio import tqdm_asyncio - from abc import ABC, abstractmethod -import argparse -import os +from typing import Any, Dict, List, Optional, Tuple + +import aiohttp import torch import transformers -from typing import Tuple, Any, Optional, List, Dict import tritonclient.grpc.aio as grpcclient -from autocrit.inference.utils import triton_call, best_of_n +from autocrit.inference.utils import best_of_n, triton_call from text_generation import Client -import logging - -import signal -import sys +from tqdm.asyncio import tqdm_asyncio class InferenceHook(ABC): def __init__(self, **kwargs): """ - kwargs: a dictionary of parameters to initilize the model + Args: + kwargs (`Dict[str, Any]`): a dictionary of parameters to initilize the model with """ pass @abstractmethod - def generate(self, prompts: List[str], **kwargs: Dict[str, Any]): + def generate(self, prompts: List[str], **kwargs: Dict[str, Any]) -> List[Dict[str, Any]]: """ - prompts: inputs for generations - kwargs: parameters to control generation + Args: + prompts (`List[str]`): inputs for generations + kwargs (`Dict[str, Any]`): parameters to control generation + + Returns: + outputs (`List[Dict[str, Any]]`): a list of dictionaries, each dictionary contains the following keys: + id (`int`): the id of the prompt + prompt (`str`): the prompt + outputs (`List[str]`): a list of outputs per prompt """ pass @abstractmethod def free(self): """ - Clean up resources after inference + Clean up resources after the inference """ pass class vLLMHook(InferenceHook): def __init__(self, model_path, tensor_parallel_size=1, external=False, num_nodes=1): + """ + + Args: + model_path (`str`): the path to the model + tensor_parallel_size (`int`): the number of GPUs to use per one server + external (`bool`): whether to spawn a new slurm jobs and generate on external nodes or use the current node + num_nodes (`int`): the number of nodes to use if external is True + """ self.init_time = time.time() self.model_path = model_path self.tensor_parallel_size = tensor_parallel_size @@ -63,8 +76,7 @@ def __init__(self, model_path, tensor_parallel_size=1, external=False, num_nodes sbatch_script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "vllm.sbatch") for _ in range(num_nodes): cmd = f"sbatch {sbatch_script_path} NUM_TP={tensor_parallel_size} MODEL_PATH={model_path} DEVICES={'|'.join(devices)}" - print(f'{cmd=}') - process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env={**os.environ, "TORCHELASTIC_USE_AGENT_STORE": ""}) + process = subprocess.Popen(cmd.split(), env={**os.environ, "TORCHELASTIC_USE_AGENT_STORE": ""}) while True: output = process.stdout.readline().decode("utf-8").strip() @@ -94,7 +106,12 @@ def __init__(self, model_path, tensor_parallel_size=1, external=False, num_nodes self.processes = [] for i in range(self.data_parallel_size): cmd = f"python -m vllm.entrypoints.api_server -tp={tensor_parallel_size} --model={model_path} --port {8000+i}" - process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env={**os.environ, "CUDA_VISIBLE_DEVICES": devices[i], "TORCHELASTIC_USE_AGENT_STORE": ""}) + kwargs = {"env": {**os.environ, "CUDA_VISIBLE_DEVICES": devices[i], "TORCHELASTIC_USE_AGENT_STORE": ""}} + if not os.environ.get("DEBUG", False): + kwargs["stdout"] = subprocess.DEVNULL + kwargs["stderr"] = subprocess.DEVNULL + + process = subprocess.Popen(cmd.split(), **kwargs) self.processes.append(process) print(f"Loading {self.data_parallel_size} processes for {model_path}...") @@ -138,7 +155,7 @@ async def request_vllm_api(self, prompt: str, i=0, num_return_sequences=1, tempe return {"id": i, "prompt": prompt, "outputs": [None]} - def generate(self, prompts, **kwargs): + def generate(self, prompts: List[str], **kwargs) -> List[Dict[str, Any]]: async def generate_vllm_api(prompts, **kwargs): outputs = [self.request_vllm_api(prompt=prompt, i=i, **kwargs) for i, prompt in enumerate(prompts)] return await tqdm_asyncio.gather(*outputs, desc=f"Inferencing {self.model_path}") @@ -152,19 +169,30 @@ async def generate_vllm_api(prompts, **kwargs): def free(self): if self.external: - subprocess.run(f"scancel {' '.join(self.job_ids)}".split()) + if self.job_ids: + subprocess.run(f"scancel {' '.join(self.job_ids)}".split()) + self.job_ids = [] else: for p in self.processes: os.kill(p.pid, signal.SIGTERM) p.communicate() print(f"Unloaded all {self.model_path} processes") + self.processes = [] def __del__(self): self.free() -# Inference hook that uses the HuggingFace API to call a model class HuggingFaceHook(InferenceHook): + """ + Inference hook that uses plain HuggingFace transformers API + """ + def __init__(self, model_path: str, tokenizer_path : Optional[str] = None): + """ + Args: + model_path (`str`): the directory of the model + tokenizer_path (`str`): the directory of the tokenizer, if None, the `model_path` is implied + """ self.model = transformers.AutoModelForCausalLM.from_pretrained(model_path, device_map="auto") self.tokenizer = transformers.AutoTokenizer.from_pretrained(tokenizer_path or model_path) if self.tokenizer.pad_token is None: @@ -199,101 +227,89 @@ def free(self): del self.model del self.tokenizer -# Inference hook that uses the HuggingFace API to call a model. Uses the best of N sampling method class HuggingFaceHookBestOfN(HuggingFaceHook): - def __init__(self, dir : str, tokenizer_name : Optional[str] = None): + """ + Inference hook that uses the HuggingFace API to call a model. Uses the best of N sampling method + """ + + def __init__(self, model_path: str, tokenizer_path : Optional[str] = None): """ - dir: the directory of the model - tokenizer_name: the name of the tokenizer to use, if None, use the model name + Args: + model_path (`str`): the directory of the model + tokenizer_path (`str`): the directory of the tokenizer, if None, the `model_path` is implied """ - super().__init__(dir, tokenizer_name) + super().__init__(model_path, tokenizer_path) - def infer(self, input_texts : List[str], - generate_params : Dict[str, Any], - **kwargs: Any) -> Any: + def generate(self, prompts: List[str], **kwargs: Any) -> List[str]: """ - input_texts: a list of strings, each string is a prompt - generate_params: a dictionary of parameters to pass to the generate function - kwargs: any additional arguments to pass to the generate function - returns: a list of strings, each string is a generated output + Args: + prompts: a list of prompts to generate + kwargs: a dictionary of parameters to pass to the generate function """ - output_txt = best_of_n(self.model, self.tokenizer, input_texts, gen_kwargs=generate_params, **kwargs) + output_txt = best_of_n(self.model, self.tokenizer, prompts, gen_kwargs=kwargs) return output_txt class TritonHook(InferenceHook): - def __init__(self, dir : str, model_name : str, tokenizer_name : Optional[str] = None): + def __init__(self, url: str, model_path: str, tokenizer_path : Optional[str] = None): """ - dir: location of the triton server - model_name: the name of the model to use - tokenizer_name: the name of the tokenizer to use, if None, use the model name + Args: + url (`str`): location of the triton server + model_path (`str`): the name of the model to use + tokenizer_path (`str`): the name of the tokenizer to use, if None, the `model_path` is implied """ - super().__init__(dir) - self.url = dir # url contains host:port + self.url = url # url contains host:port # TODO: if URL is a path to a triton model, we shold load the model and launch the server - self.model_name = model_name - if tokenizer_name is None: - self.tokenizer_name = model_name + self.model_path = model_path + if tokenizer_path is None: + self.tokenizer_path = model_path else: - self.tokenizer_name = tokenizer_name + self.tokenizer_path = tokenizer_path - self.client = None - self.tokenizer = None - - def load(self, **kwargs): # create a client using url self.client = grpcclient.InferenceServerClient(url=self.url) - self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.tokenizer_name) + self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.tokenizer_path) # check if there is a padding token, if not add one if self.tokenizer.pad_token is None: self.tokenizer.add_special_tokens({"pad_token": "[PAD]"}) - def infer(self, input_texts : List[str], - generate_params : Dict[str, Any], - **kwargs: Any) -> Any: + def generate(self, prompts: List[str], **kwargs: Any) -> List[str]: """ - input_texts: a list of strings, each string is a prompt - generate_params: a dictionary of parameters to pass to the generate function - kwargs: any additional arguments to pass to the generate function - returns: a list of strings, each string is a generated output + Args: + prompts: a list of prompts to generate + kwargs: a dictionary of parameters to pass to the generate function """ # use self.client to call the model # assume input and output are batched - inp_text = input_texts - inps = self.tokenizer(inp_text, return_tensors="pt", padding=True) + inps = self.tokenizer(prompts, return_tensors="pt", padding=True) # call infer - logits, output_txt = triton_call(self.client, self.model_name, inps.input_ids, **generate_params) + logits, output_txt = triton_call(self.client, self.model_path, inps.input_ids, **kwargs) if not "no_decode" in kwargs: output_txt = self.tokenizer.batch_decode(output_txt, skip_special_tokens=True) # check if logits are needed - if generate_params["return_logits"]: + if kwargs["return_logits"]: return output_txt, logits else: return output_txt -# Inference hook that uses the HuggingFace API to call a model class TextGenerationHook(InferenceHook): - def __init__(self, dir : str): - super().__init__(dir) - self.model_name = dir - self.client = None - - def load(self, **kwargs): + def __init__(self, model_path: str): + self.model_path = model_path # get num shards and port, used for the launcher script num_shards = kwargs.get("num_shards", 1) port = kwargs.get("port", 8080) # check if model name is a URL - if not self.model_name.startswith("http"): + if not self.model_path.startswith("http"): # launch the model using the model name and text_generation_launcher.sh # The following line runs the launcher script - output = subprocess.run(["sbatch./launch.sbatch MODEL_NAME="+str(self.model_name) + " NUM_SHARD="+str(num_shards) + " PORT="+str(port)], capture_output=True) + output = subprocess.run(["sbatch./launch.sbatch MODEL_NAME="+str(self.model_path) + " NUM_SHARD="+str(num_shards) + " PORT="+str(port)], capture_output=True) logging.info(output.stdout.decode("utf-8")) # check return code if output.returncode != 0: @@ -317,22 +333,18 @@ def load(self, **kwargs): # Create the client self.client = Client(f"http://{ip}:{port}") else: - self.client = Client(self.model_name) + self.client = Client(self.model_path) - def infer(self, input_texts : List[str], - generate_params : Dict[str, Any], - **kwargs: Any) -> Any: + def generate(self, prompts: List[str], **kwargs: Dict[str, Any]) -> List[str]: """ - input_texts: a list of strings, each string is a prompt - generate_params: a dictionary of parameters to pass to the generate function - kwargs: any additional arguments to pass to the generate function - returns: a list of strings, each string is a generated output + Args: + prompts (`List[str]`): a list of strings, each string is a prompt + kwargs (`Dict[str, Any]`): a dictionary of parameters to pass to the generate function """ # if input_texts is a list, convert it to a string if isinstance(input_texts, list): input_texts = input_texts[0] # use self.client to call the model - output_txt = self.client.generate(input_texts, **generate_params).generated_text - #print(output_txt) + output_txt = self.client.generate(input_texts, **kwargs).generated_text return output_txt From 2a2b6922daf83290e3ceda557ca2f9c9d9f5ba06 Mon Sep 17 00:00:00 2001 From: maxreciprocate <56548574+maxreciprocate@users.noreply.github.com> Date: Fri, 15 Sep 2023 15:28:15 +0300 Subject: [PATCH 07/12] fix(inference): heartbeat messages should be not empty for models that don't prepend bos token --- autocrit/inference/inference_hook.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/autocrit/inference/inference_hook.py b/autocrit/inference/inference_hook.py index ccdc812..cda7904 100644 --- a/autocrit/inference/inference_hook.py +++ b/autocrit/inference/inference_hook.py @@ -50,33 +50,33 @@ def free(self): pass class vLLMHook(InferenceHook): - def __init__(self, model_path, tensor_parallel_size=1, external=False, num_nodes=1): + def __init__(self, model_path, tensor_parallel_size=1, num_external_nodes=0): """ + Starts data parallel vLLM servers either locally or on separate nodes by spawning slurm jobs Args: model_path (`str`): the path to the model tensor_parallel_size (`int`): the number of GPUs to use per one server - external (`bool`): whether to spawn a new slurm jobs and generate on external nodes or use the current node - num_nodes (`int`): the number of nodes to use if external is True + num_external_nodes (`int`): spawn this many slurm jobs for the servers, if `0`, use only local resourses """ self.init_time = time.time() self.model_path = model_path self.tensor_parallel_size = tensor_parallel_size - self.external = external + self.num_external_nodes = num_external_nodes self.nth_request = 0 devices = list(map(str, range(torch.cuda.device_count()))) devices = [",".join(devices[i*tensor_parallel_size:(i+1)*tensor_parallel_size]) for i in range(len(devices) // tensor_parallel_size)] - if external: + if num_external_nodes: self.job_ids = [] self.servers = [] self.data_parallel_size = torch.cuda.device_count() * num_nodes // tensor_parallel_size sbatch_script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "vllm.sbatch") - for _ in range(num_nodes): + for _ in range(num_external_nodes): cmd = f"sbatch {sbatch_script_path} NUM_TP={tensor_parallel_size} MODEL_PATH={model_path} DEVICES={'|'.join(devices)}" - process = subprocess.Popen(cmd.split(), env={**os.environ, "TORCHELASTIC_USE_AGENT_STORE": ""}) + process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, env={**os.environ, "TORCHELASTIC_USE_AGENT_STORE": ""}) while True: output = process.stdout.readline().decode("utf-8").strip() @@ -120,7 +120,7 @@ def __init__(self, model_path, tensor_parallel_size=1, external=False, num_nodes while not_loaded: for server in not_loaded: try: - asyncio.run(self.request_vllm_api(server=server, prompt="", max_new_tokens=1)) + asyncio.run(self.request_vllm_api(server=server, prompt=".", max_new_tokens=1)) not_loaded.remove(server) except aiohttp.client_exceptions.ClientConnectorError: break @@ -168,7 +168,7 @@ async def generate_vllm_api(prompts, **kwargs): return outputs def free(self): - if self.external: + if self.num_external_nodes: if self.job_ids: subprocess.run(f"scancel {' '.join(self.job_ids)}".split()) self.job_ids = [] From a352672ae04e80366de35ce1a4f2bf7e15d86f90 Mon Sep 17 00:00:00 2001 From: maxreciprocate <56548574+maxreciprocate@users.noreply.github.com> Date: Fri, 15 Sep 2023 18:30:01 +0300 Subject: [PATCH 08/12] refactor(inference): `num_nodes` -> `num_external_nodes` --- autocrit/inference/inference_hook.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autocrit/inference/inference_hook.py b/autocrit/inference/inference_hook.py index cda7904..16fd4ac 100644 --- a/autocrit/inference/inference_hook.py +++ b/autocrit/inference/inference_hook.py @@ -71,7 +71,7 @@ def __init__(self, model_path, tensor_parallel_size=1, num_external_nodes=0): if num_external_nodes: self.job_ids = [] self.servers = [] - self.data_parallel_size = torch.cuda.device_count() * num_nodes // tensor_parallel_size + self.data_parallel_size = torch.cuda.device_count() * num_external_nodes // tensor_parallel_size sbatch_script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "vllm.sbatch") for _ in range(num_external_nodes): From 35a86f1a11d69ad70d7f15c4f9dad3a9dae6b8ab Mon Sep 17 00:00:00 2001 From: maxreciprocate <56548574+maxreciprocate@users.noreply.github.com> Date: Thu, 21 Sep 2023 19:58:51 +0300 Subject: [PATCH 09/12] chore(inference): keep vllm logs in a separate folder --- autocrit/inference/inference_hook.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/autocrit/inference/inference_hook.py b/autocrit/inference/inference_hook.py index 16fd4ac..0f6bda5 100644 --- a/autocrit/inference/inference_hook.py +++ b/autocrit/inference/inference_hook.py @@ -87,10 +87,10 @@ def __init__(self, model_path, tensor_parallel_size=1, num_external_nodes=0): if output.startswith("Submitted batch job"): self.job_ids.append(output.split()[-1].strip()) - while not os.path.exists(f"{self.job_ids[-1]}"): + while not os.path.exists(f"vllm_logs/{self.job_ids[-1]}"): time.sleep(1) - with open(f"{self.job_ids[-1]}") as log: + with open(f"vllm_logs/{self.job_ids[-1]}") as log: while True: output = log.readline().strip() if output: From 19327c69e21efddcaeb1548fb676ece8a54de980 Mon Sep 17 00:00:00 2001 From: maxreciprocate <56548574+maxreciprocate@users.noreply.github.com> Date: Thu, 28 Sep 2023 20:11:46 +0300 Subject: [PATCH 10/12] feat(inference): add RewardHook --- autocrit/inference/inference_hook.py | 107 ++++++++++++++++++++++++--- 1 file changed, 96 insertions(+), 11 deletions(-) diff --git a/autocrit/inference/inference_hook.py b/autocrit/inference/inference_hook.py index 0f6bda5..6654788 100644 --- a/autocrit/inference/inference_hook.py +++ b/autocrit/inference/inference_hook.py @@ -1,3 +1,4 @@ +import math import argparse import asyncio import json @@ -49,6 +50,10 @@ def free(self): """ pass + def __del__(self): + self.free() + + class vLLMHook(InferenceHook): def __init__(self, model_path, tensor_parallel_size=1, num_external_nodes=0): """ @@ -179,8 +184,84 @@ def free(self): print(f"Unloaded all {self.model_path} processes") self.processes = [] - def __del__(self): - self.free() + +class RewardHook(InferenceHook): + def __init__(self, model_path: str): + self.init_time = time.time() + self.model_path = model_path + self.nth_request = 0 + + tensor_parallel_size = 1 + devices = list(map(str, range(torch.cuda.device_count()))) + devices = [",".join(devices[i*tensor_parallel_size:(i+1)*tensor_parallel_size]) for i in range(len(devices) // tensor_parallel_size)] + + self.data_parallel_size = torch.cuda.device_count() // tensor_parallel_size + self.servers = [f"localhost:{8000+i}" for i in range(self.data_parallel_size)] + self.processes = [] + for i in range(self.data_parallel_size): + server_script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "server.py") + cmd = f"python {server_script_path} --model {model_path} --port {8000+i}" + kwargs = {"env": {**os.environ, "CUDA_VISIBLE_DEVICES": devices[i], "TORCHELASTIC_USE_AGENT_STORE": ""}} + if not os.environ.get("DEBUG", False): + kwargs["stdout"] = subprocess.DEVNULL + kwargs["stderr"] = subprocess.DEVNULL + + process = subprocess.Popen(cmd.split(), **kwargs) + self.processes.append(process) + + print(f"Loading {self.data_parallel_size} processes for {model_path}...") + not_loaded = list(self.servers) + while not_loaded: + for server in not_loaded: + try: + asyncio.run(self.request_api(server=server, samples=None)) + not_loaded.remove(server) + except aiohttp.client_exceptions.ClientConnectorError: + break + + time.sleep(1) + + async def request_api(self, samples: List[str], server=None, batch_size=8, max_length=2048): + pload = { + "samples": samples, + "batch_size": batch_size, + "max_length": max_length, + } + + if server is None: + server = self.servers[self.nth_request % self.data_parallel_size] + self.nth_request += 1 + + connector = aiohttp.TCPConnector(limit_per_host=32768) + timeout = aiohttp.ClientTimeout(total=3600) + async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: + async with session.post(f"http://{server}/score", json=pload) as response: + try: + data = await response.json() + return data + except aiohttp.client.ContentTypeError: + return None + + def score(self, samples: List[str], **kwargs) -> List[Dict[str, Any]]: + async def score_async_api(samples, **kwargs): + batch_size = math.ceil(len(samples) / self.data_parallel_size) + samples_batched = [samples[i*batch_size:(i+1)*batch_size] for i in range(self.data_parallel_size)] + outputs = [self.request_api(samples_batched[i], **kwargs) for i in range(self.data_parallel_size)] + + return await tqdm_asyncio.gather(*outputs, desc=f"Inferencing {self.model_path}") + + return sum(asyncio.run(score_async_api(samples, **kwargs)), []) + + def generate(self, prompts: List[str], **kwargs) -> List[Dict[str, Any]]: + return self.score(prompts, **kwargs) + + def free(self): + for p in self.processes: + os.kill(p.pid, signal.SIGTERM) + p.communicate() + print(f"Unloaded all {self.model_path} processes") + self.processes = [] + class HuggingFaceHook(InferenceHook): """ @@ -195,30 +276,34 @@ def __init__(self, model_path: str, tokenizer_path : Optional[str] = None): """ self.model = transformers.AutoModelForCausalLM.from_pretrained(model_path, device_map="auto") self.tokenizer = transformers.AutoTokenizer.from_pretrained(tokenizer_path or model_path) + self.tokenizer.padding_side = "left" + self.tokenizer.truncation_side = "left" if self.tokenizer.pad_token is None: self.tokenizer.add_special_tokens({"pad_token": "[PAD]"}) @torch.inference_mode() def generate(self, prompts: List[str], **kwargs: Any) -> List[str]: stop = kwargs.pop("stop", []) + max_length = kwargs.get("max_length", 2048) + max_new_tokens = kwargs.get("max_new_tokens", 512) + temperature = kwargs.get("temperature", 1.0) + num_return_sequences = kwargs.get("num_return_sequences", 1) - inputs = self.tokenizer(prompts, return_tensors="pt", padding=True, truncation=True, max_length=kwargs.get("max_length", 2048)).to(self.model.device) - - all_ids = self.model.generate(**inputs, pad_token_id=self.tokenizer.pad_token_id, eos_token_id=self.tokenizer.eos_token_id, **kwargs) - output_ids = all_ids[:, inputs.input_ids.shape[1]:] - - if "no_decode" in kwargs: - return output + outputs = [] + for i in range(len(prompts)): + inputs = self.tokenizer([prompts[i]], return_tensors="pt", padding=True, truncation=True, max_length=max_length).to(self.model.device) - outputs = self.tokenizer.batch_decode(output_ids, skip_special_tokens=True) + all_ids = self.model.generate(**inputs, pad_token_id=self.tokenizer.pad_token_id, eos_token_id=self.tokenizer.eos_token_id, max_new_tokens=max_new_tokens, do_sample=temperature > 0, temperature=temperature, num_return_sequences=num_return_sequences) + output_ids = all_ids[:, inputs.input_ids.shape[1]:] + outputs.extend(output_ids) + outputs = self.tokenizer.batch_decode(outputs, skip_special_tokens=True) for i in range(len(outputs)): for s in stop: if s in outputs[i]: outputs[i] = outputs[i][:outputs[i].index(s)] - num_return_sequences = kwargs.get("num_return_sequences", 1) outputs = [{"id": i, "prompt": p, "outputs": outputs[i*num_return_sequences:(i+1)*num_return_sequences]} for i, p in enumerate(prompts)] return outputs From f9205ca0ee558e3628e0f068f580f3207ea6bbdb Mon Sep 17 00:00:00 2001 From: maxreciprocate <56548574+maxreciprocate@users.noreply.github.com> Date: Thu, 5 Oct 2023 19:12:46 +0300 Subject: [PATCH 11/12] feat(inference): add openai hook --- autocrit/inference/__init__.py | 2 +- autocrit/inference/inference_hook.py | 50 ++++++++++++++++++++++++++-- autocrit/inference/vllm.sbatch | 5 ++- 3 files changed, 51 insertions(+), 6 deletions(-) diff --git a/autocrit/inference/__init__.py b/autocrit/inference/__init__.py index 93f0ae3..72154a0 100644 --- a/autocrit/inference/__init__.py +++ b/autocrit/inference/__init__.py @@ -1 +1 @@ -from .inference_hook import vLLMHook, HuggingFaceHook +from .inference_hook import vLLMHook, HuggingFaceHook, RewardHook, OpenAIHook diff --git a/autocrit/inference/inference_hook.py b/autocrit/inference/inference_hook.py index 6654788..c527bba 100644 --- a/autocrit/inference/inference_hook.py +++ b/autocrit/inference/inference_hook.py @@ -10,6 +10,8 @@ import time from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional, Tuple +from tqdm import tqdm +import openai import aiohttp import torch @@ -110,7 +112,7 @@ def __init__(self, model_path, tensor_parallel_size=1, num_external_nodes=0): self.servers = [f"localhost:{8000+i}" for i in range(self.data_parallel_size)] self.processes = [] for i in range(self.data_parallel_size): - cmd = f"python -m vllm.entrypoints.api_server -tp={tensor_parallel_size} --model={model_path} --port {8000+i}" + cmd = f"python -m vllm.entrypoints.api_server -tp={tensor_parallel_size} --model={model_path} --port {8000+i} --dtype bfloat16" kwargs = {"env": {**os.environ, "CUDA_VISIBLE_DEVICES": devices[i], "TORCHELASTIC_USE_AGENT_STORE": ""}} if not os.environ.get("DEBUG", False): kwargs["stdout"] = subprocess.DEVNULL @@ -135,7 +137,7 @@ def __init__(self, model_path, tensor_parallel_size=1, num_external_nodes=0): self.load_time = time.time() - self.init_time print(f"Loaded {model_path} in {self.load_time:.0f}s") - async def request_vllm_api(self, prompt: str, i=0, num_return_sequences=1, temperature=0.0, max_new_tokens=512, stop=[], server=None): + async def request_vllm_api(self, prompt: str, i=0, num_return_sequences=1, temperature=0.0, max_new_tokens=512, stop=[], server=None, **kwargs): pload = { "prompt": prompt, "n": num_return_sequences, @@ -184,6 +186,50 @@ def free(self): print(f"Unloaded all {self.model_path} processes") self.processes = [] +def generate_openai(prompt, model_path="gpt-3.5-turbo", max_new_tokens=128, system_prompt="", temperature=1, stop=[], num_return_sequences=1, **kwargs): + MAX_API_RETRY = 5 + for _ in range(MAX_API_RETRY): + try: + if model_path not in ['gpt-3.5-turbo', 'gpt-4']: + kwargs = {"deployment_id": model_path} + else: + kwargs = {"model": model_path, "stop": stop} + + response = openai.ChatCompletion.create( + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": prompt}, + ], + temperature=temperature, + max_tokens=max_new_tokens, + n=num_return_sequences, + **kwargs, + ) + + return [response["choices"][ix]["message"]["content"] for ix in range(num_return_sequences)] + + except Exception as e: + print(e) + time.sleep(10) + + return [None] + +class OpenAIHook(InferenceHook): + def __init__(self, model_path: str): + self.model_path = model_path + + if not os.environ.get("OPENAI_API_KEY") and openai.api_key is None: + raise RuntimeError("OPENAI_API_KEY is not set") + + def generate(self, prompts: List[str], **kwargs) -> List[str]: + outputs = [] + for prompt in tqdm(prompts, desc=f"Inferencing {self.model_path}"): + outputs.append(generate_openai(prompt, model_path=self.model_path, **kwargs)) + + return [{"id": i, "prompt": prompt, "outputs": outputs[i]} for i, prompt in enumerate(prompts)] + + def free(self): + pass class RewardHook(InferenceHook): def __init__(self, model_path: str): diff --git a/autocrit/inference/vllm.sbatch b/autocrit/inference/vllm.sbatch index 71dcddb..4198a12 100644 --- a/autocrit/inference/vllm.sbatch +++ b/autocrit/inference/vllm.sbatch @@ -1,14 +1,13 @@ #!/bin/bash #SBATCH --job-name=vllm #SBATCH --partition=g80 -#SBATCH --account=carperai +#SBATCH --account=stability #SBATCH --nodes=1 #SBATCH --ntasks-per-node=1 #SBATCH --mem=0 #SBATCH --cpus-per-task=64 -#SBATCH --output=%j +#SBATCH --output=vllm_logs/%j #SBATCH --exclusive -#SBATCH --exclude ip-26-0-158-175 for ARGUMENT in "$@" do From b1a3693f0392c0f68d6b6c6f041fd091c81e2e1c Mon Sep 17 00:00:00 2001 From: maxreciprocate <56548574+maxreciprocate@users.noreply.github.com> Date: Fri, 13 Oct 2023 16:29:31 +0300 Subject: [PATCH 12/12] fix(vllm.sbatch): do `ray stop` & lower `batch_size` --- autocrit/inference/inference_hook.py | 6 +++--- autocrit/inference/vllm.sbatch | 4 +++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/autocrit/inference/inference_hook.py b/autocrit/inference/inference_hook.py index c527bba..09c89fd 100644 --- a/autocrit/inference/inference_hook.py +++ b/autocrit/inference/inference_hook.py @@ -112,7 +112,7 @@ def __init__(self, model_path, tensor_parallel_size=1, num_external_nodes=0): self.servers = [f"localhost:{8000+i}" for i in range(self.data_parallel_size)] self.processes = [] for i in range(self.data_parallel_size): - cmd = f"python -m vllm.entrypoints.api_server -tp={tensor_parallel_size} --model={model_path} --port {8000+i} --dtype bfloat16" + cmd = f"python -m vllm.entrypoints.api_server -tp={tensor_parallel_size} --model={model_path} --port {8000+i}" kwargs = {"env": {**os.environ, "CUDA_VISIBLE_DEVICES": devices[i], "TORCHELASTIC_USE_AGENT_STORE": ""}} if not os.environ.get("DEBUG", False): kwargs["stdout"] = subprocess.DEVNULL @@ -137,7 +137,7 @@ def __init__(self, model_path, tensor_parallel_size=1, num_external_nodes=0): self.load_time = time.time() - self.init_time print(f"Loaded {model_path} in {self.load_time:.0f}s") - async def request_vllm_api(self, prompt: str, i=0, num_return_sequences=1, temperature=0.0, max_new_tokens=512, stop=[], server=None, **kwargs): + async def request_vllm_api(self, prompt: str, i=0, num_return_sequences=1, temperature=1.0, max_new_tokens=512, stop=[], server=None, **kwargs): pload = { "prompt": prompt, "n": num_return_sequences, @@ -167,7 +167,7 @@ async def generate_vllm_api(prompts, **kwargs): outputs = [self.request_vllm_api(prompt=prompt, i=i, **kwargs) for i, prompt in enumerate(prompts)] return await tqdm_asyncio.gather(*outputs, desc=f"Inferencing {self.model_path}") - batch_size = 32768 + batch_size = 16384 outputs = [] for i in range(0, len(prompts), batch_size): outputs += asyncio.run(generate_vllm_api(prompts[i:i+batch_size], **kwargs)) diff --git a/autocrit/inference/vllm.sbatch b/autocrit/inference/vllm.sbatch index 4198a12..c0f3d6b 100644 --- a/autocrit/inference/vllm.sbatch +++ b/autocrit/inference/vllm.sbatch @@ -1,6 +1,6 @@ #!/bin/bash #SBATCH --job-name=vllm -#SBATCH --partition=g80 +#SBATCH --partition=g40 #SBATCH --account=stability #SBATCH --nodes=1 #SBATCH --ntasks-per-node=1 @@ -9,6 +9,8 @@ #SBATCH --output=vllm_logs/%j #SBATCH --exclusive +ray stop + for ARGUMENT in "$@" do KEY=$(echo $ARGUMENT | cut -f1 -d=)