diff --git a/xgboost_ray/examples/simple_ray_dataset.py b/xgboost_ray/examples/simple_ray_dataset.py index 8fee2930..c38be826 100644 --- a/xgboost_ray/examples/simple_ray_dataset.py +++ b/xgboost_ray/examples/simple_ray_dataset.py @@ -3,11 +3,13 @@ import numpy as np import pandas as pd import ray +from xgboost import DMatrix from xgboost_ray import RayDMatrix, RayParams, train def main(cpus_per_actor, num_actors): + np.random.seed(1234) # Generate dataset x = np.repeat(range(8), 16).reshape((32, 4)) # Even numbers --> 0, odd numbers --> 1 @@ -22,16 +24,7 @@ def main(cpus_per_actor, num_actors): data.columns = [str(c) for c in data.columns] data["label"] = y - # There was recent API change - the first clause covers the new - # and current Ray master API - if hasattr(ray.data, "from_pandas_refs"): - # Generate Ray dataset from 4 partitions - ray_ds = ray.data.from_pandas(data).repartition(num_actors) - else: - # Split into 4 partitions - partitions = [ray.put(part) for part in np.split(data, num_actors)] - ray_ds = ray.data.from_pandas(partitions) - + ray_ds = ray.data.from_pandas(data) train_set = RayDMatrix(ray_ds, "label") evals_result = {} @@ -62,6 +55,12 @@ def main(cpus_per_actor, num_actors): bst.save_model(model_path) print("Final training error: {:.4f}".format(evals_result["train"]["error"][-1])) + # Distributed prediction + scored = ray_ds.drop_columns(["label"]).map_batches( + lambda batch: {"pred": bst.predict(DMatrix(batch))}, batch_format="pandas" + ) + print(scored.to_pandas()) + if __name__ == "__main__": parser = argparse.ArgumentParser()