Skip to content

Commit

Permalink
Refactor reconstruct_distribution
Browse files Browse the repository at this point in the history
  • Loading branch information
mar-be committed May 17, 2024
1 parent fb133d3 commit 53ee2b2
Showing 1 changed file with 79 additions and 61 deletions.
140 changes: 79 additions & 61 deletions app/CKT_cutter.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,60 +103,20 @@ def automatic_cut(
}


def _find_cuts(circuit, optimization_settings, device_constraints):
try:
return find_cuts(circuit, optimization_settings, device_constraints)
except ValueError:
return find_cuts(circuit.decompose(), optimization_settings, device_constraints)


def _partition_problem(qc_w_ancilla, observables_expanded):
return partition_problem(circuit=qc_w_ancilla, observables=observables_expanded)


def _extract_subcircuits(subexperiments):
individual_subcircuits = []
subcircuit_labels = []

for label, circs in subexperiments.items():
for sub_circ in circs:
individual_subcircuits.append(sub_circ)
subcircuit_labels.append(label)

return individual_subcircuits, subcircuit_labels


def _create_custom_metadata(metadata, partition_labels, qubit_map, subobservables):
custom_metadata = {
**metadata,
"partition_labels": partition_labels,
"qubit_map": qubit_map,
"subobservables": {
partition: subobs.to_labels()[0]
for partition, subobs in subobservables.items()
},
}
return custom_metadata


def reconstruct_distribution(
results: dict[Hashable, SamplerResult] | dict[Hashable, list],
coefficients: Sequence[tuple[float, WeightType]],
qubit_map: Sequence[tuple[int, int]],
subobservables: dict[str, str],
) -> dict[int, float]:
result_dict = defaultdict(float)
labels = []
# ensure order of the labels
for label, _ in qubit_map:
if label not in labels:
labels.append(label)
labels = _order_labels(qubit_map)

if isinstance(labels[0], int):
subobservables = {int(key): val for key, val in subobservables.items()}
results = {int(key): val for key, val in results.items()}

qubits = {key: val.count("Z") for key, val in subobservables.items()}
qubits = _count_qubits_per_label(subobservables)
observable = "".join([subobservables[l] for l in reversed(labels)])

label_index_lists = defaultdict(list)
Expand Down Expand Up @@ -188,30 +148,15 @@ def reconstruct_distribution(
for meas_keys, quasi_prob_vals in product_dicts(
*list(coeff_result_dict.values())
).items():
combined_meas = 0
for meas, label in zip(meas_keys, coeff_result_dict.keys()):
combined_meas += shift_bits_by_index(meas, label_index_lists[label])
combined_meas = _calculate_combined_measurement(
meas_keys, coeff_result_dict, label_index_lists
)
result_dict[combined_meas] += coeff[0] * math.prod(quasi_prob_vals)

if not "I" in observable:
return result_dict

result_dict_traced_out = defaultdict(float)
qubits_to_trace_out = []
for label, sub_obs in subobservables.items():
qubits_to_trace_out_in_sub_obs = list(find_character_in_string(sub_obs, "I"))
qubits_to_trace_out_in_sub_obs = [
len(sub_obs) - i - 1 for i in qubits_to_trace_out_in_sub_obs
]
for qubit_in_sub_obs in qubits_to_trace_out_in_sub_obs:
qubit = label_index_lists[label][qubit_in_sub_obs]
qubits_to_trace_out.append(qubit)

for meas, val in result_dict.items():
meas_traced_out = remove_bits(meas, qubits_to_trace_out)
result_dict_traced_out[meas_traced_out] += val

return result_dict_traced_out
return _trace_out_qubits(result_dict, subobservables, label_index_lists)


def reconstruct_result(input_dict: CombineResultsRequest, quokka_format=False):
Expand All @@ -238,3 +183,76 @@ def reconstruct_result(input_dict: CombineResultsRequest, quokka_format=False):
}

return CombineResultsResponse(result=result)


def _find_cuts(circuit, optimization_settings, device_constraints):
try:
return find_cuts(circuit, optimization_settings, device_constraints)
except ValueError:
return find_cuts(circuit.decompose(), optimization_settings, device_constraints)


def _partition_problem(qc_w_ancilla, observables_expanded):
return partition_problem(circuit=qc_w_ancilla, observables=observables_expanded)


def _extract_subcircuits(subexperiments):
individual_subcircuits = []
subcircuit_labels = []

for label, circs in subexperiments.items():
for sub_circ in circs:
individual_subcircuits.append(sub_circ)
subcircuit_labels.append(label)

return individual_subcircuits, subcircuit_labels


def _create_custom_metadata(metadata, partition_labels, qubit_map, subobservables):
custom_metadata = {
**metadata,
"partition_labels": partition_labels,
"qubit_map": qubit_map,
"subobservables": {
partition: subobs.to_labels()[0]
for partition, subobs in subobservables.items()
},
}
return custom_metadata


def _order_labels(qubit_map):
labels = []
for label, _ in qubit_map:
if label not in labels:
labels.append(label)
return labels


def _count_qubits_per_label(subobservables):
return {key: val.count("Z") for key, val in subobservables.items()}


def _calculate_combined_measurement(meas_keys, coeff_result_dict, label_index_lists):
combined_meas = 0
for meas, label in zip(meas_keys, coeff_result_dict.keys()):
combined_meas += shift_bits_by_index(meas, label_index_lists[label])
return combined_meas


def _trace_out_qubits(result_dict, subobservables, label_index_lists):
result_dict_traced_out = defaultdict(float)
qubits_to_trace_out = []
for label, sub_obs in subobservables.items():
qubits_to_trace_out_in_sub_obs = list(find_character_in_string(sub_obs, "I"))
qubits_to_trace_out_in_sub_obs = [
len(sub_obs) - i - 1 for i in qubits_to_trace_out_in_sub_obs
]
for qubit_in_sub_obs in qubits_to_trace_out_in_sub_obs:
qubit = label_index_lists[label][qubit_in_sub_obs]
qubits_to_trace_out.append(qubit)

for meas, val in result_dict.items():
meas_traced_out = remove_bits(meas, qubits_to_trace_out)
result_dict_traced_out[meas_traced_out] += val
return result_dict_traced_out

0 comments on commit 53ee2b2

Please sign in to comment.