-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
python[minor] update evaluate to be concurrent #1345
Conversation
@@ -642,6 +651,61 @@ async def astart(self) -> _AsyncExperimentManager: | |||
upload_results=self._upload_results, | |||
) | |||
|
|||
async def awith_predictions_and_evaluators( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could probably do something similar to what we do in the sync version to avoid having to duplicate logic here (basically share a semaphor)
evaluators = _resolve_evaluators(evaluators) | ||
|
||
if not hasattr(self, "_evaluator_executor"): | ||
self._evaluator_executor = cf.ThreadPoolExecutor(max_workers=4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ooc where's the 4 come from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I copied the value from _ascore
- not really sure beyond that
) | ||
async with lock: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we just return the selected_results in _run_single_evaluator and construct the eval_results after the asycio.gather? to avoid needing to lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be fixed, but someone should check I did it correctly
{ | ||
name: { | ||
"presigned_url": value["presigned_url"], | ||
"reader": io.BytesIO(value["reader"].getvalue()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would love @agola11's input on this bit
new_attachments[name] = { | ||
"presigned_url": attachment["presigned_url"], | ||
"reader": io.BytesIO( | ||
self._attachment_raw_data_dict[str(example.id) + name] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're sure this doesn't copy the bytes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, you are correct. io.BytesIO copies the underlying bytes. This is wrong, I am working on a fix rn.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ehh actually I am going to walk back my statement. based on testing I don't think bytesIO copies the data.
@@ -617,7 +617,6 @@ def summary_eval_outputs_reference(outputs, reference_outputs): | |||
tolerance = 3 | |||
assert total_slow < tolerance | |||
assert total_quick > (SPLIT_SIZE * NUM_REPETITIONS - 1) - tolerance | |||
assert any([d > 1 for d in deltas]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hinthornw I made this change to pass CI, but I would appreciate your review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's basically meant to test that we aren't iterating as two phases predict -> evaluate but instead doing generate -> evaluate as a continuous, eager stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should remove this test
return schemas.Example( | ||
id=example.id, | ||
created_at=example.created_at, | ||
dataset_id=example.dataset_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems likely that we'll forget to update this when we add a new field - if we haven't added a test for this in the previous version, would like one
manager = await manager.awith_summary_evaluators(summary_evaluators) | ||
if evaluators: | ||
# Run predictions and evaluations in a single pipeline | ||
manager = await manager.awith_predictions_and_evaluators( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If predictions are streamed out do we need a separate method?
No description provided.