Skip to content

Commit

Permalink
Merge pull request #184 from oqc-community/improvement/lc/duration_ti…
Browse files Browse the repository at this point in the history
…meline

Performance improvement: create_duration_timeline
  • Loading branch information
lcauser-oqc authored Nov 7, 2024
2 parents df563ff + 4403006 commit 164afc9
Show file tree
Hide file tree
Showing 2 changed files with 326 additions and 36 deletions.
76 changes: 40 additions & 36 deletions src/qat/purr/compiler/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,50 +426,54 @@ def create_duration_timeline(self, instructions: List[QuantumInstruction]):
total_durations: Dict[PulseChannel, int] = dict()

for instruction in instructions:
for qtarget in instruction.quantum_targets:
# TODO: Acquire is a special quantum target for post processing.
# This should probably be changed.
if isinstance(qtarget, Acquire):
qtarget = qtarget.channel

device_instructions: List[PositionData] = results.setdefault(qtarget, [])
if not any(device_instructions):
sample_start = 0
else:
sample_start = device_instructions[-1].end

# For syncs we want to look at the currently-processed instructions on
# the channels we target, get the max end time then align all of our
# channels to that point in time.
position_data = None
if isinstance(instruction, Synchronize):
current_durations = {
qt: total_durations.setdefault(qt, 0)
for qt in instruction.quantum_targets
}
longest_length = max(current_durations.values(), default=0.0)
# TODO: Acquire is a special quantum target for post processing.
# This should probably be changed.
qtargets = [
qtarget.channel if isinstance(qtarget, Acquire) else qtarget
for qtarget in instruction.quantum_targets
]

# For syncs we want to look at the currently-processed instructions on
# the channels we target, get the max end time then align all of our
# channels to that point in time.
if isinstance(instruction, Synchronize):
longest_length = max(
[total_durations.setdefault(qt, 0) for qt in qtargets],
default=0.0,
)
for qtarget in qtargets:
device_instructions: List[PositionData] = results.setdefault(
qtarget, []
)
delay_time = longest_length - total_durations[qtarget]
if delay_time > 0:
start = (
device_instructions[-1].end if any(device_instructions) else 0
)
instr = Delay(qtarget, delay_time)
position_data = PositionData(
sample_start,
sample_start + calculate_duration(instr),
start,
start + calculate_duration(instr),
instr,
)
else:
position_data = PositionData(
sample_start,
sample_start + calculate_duration(instruction),
instruction,
device_instructions.append(position_data)
total_durations[qtarget] = longest_length
else:
duration = calculate_duration(instruction)
for qtarget in qtargets:
device_instructions: List[PositionData] = results.setdefault(
qtarget, []
)
start = device_instructions[-1].end if any(device_instructions) else 0
device_instructions.append(
PositionData(
start,
start + duration,
instruction,
)
)

if position_data is not None:
device_instructions.append(position_data)

# Calculate running durations for sync/delay evaluation
current_duration = total_durations.setdefault(qtarget, 0)
total_durations[qtarget] = (
current_duration + position_data.instruction.duration
total_durations.get(qtarget, 0) + instruction.duration
)

# Strip timelines that only hold delays and phase resets, since that just means nothing is
Expand Down
286 changes: 286 additions & 0 deletions tests/qat/test_quantum_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
PhaseReset,
PhaseShift,
PostProcessing,
Pulse,
SweepValue,
Variable,
)
Expand Down Expand Up @@ -714,6 +715,291 @@ def test_create_duration_timeline_mapping(self):
*[PostProcessing] * 4,
]

@pytest.mark.parametrize(
"hw", [get_test_model(), get_default_echo_hardware(), get_default_RTCS_hardware()]
)
def test_duration_timeline_aligns(self, hw):
"""
For circuits where quantum targets are synced, check that instructions align.
"""
engine = hw.create_engine()
q1 = hw.get_qubit(0)
q2 = hw.get_qubit(1)

# Build a simple circuit
builder = (
get_builder(hw)
.X(q1, np.pi / 2.0)
.X(q2, np.pi)
.ECR(q1, q2)
.measure_mean_z(q1)
.measure_mean_z(q2)
)
pos_map = engine.create_duration_timeline(builder.instructions)

# Check that pulse instructions start concurrently
q1_cr = q1.get_pulse_channel(ChannelType.cross_resonance, [q2])
q2_crc = q2.get_pulse_channel(ChannelType.cross_resonance_cancellation, [q1])
times_q1_cr = [
inst.start for inst in pos_map[q1_cr] if isinstance(inst.instruction, Pulse)
]
times_q2_crc = [
inst.start for inst in pos_map[q2_crc] if isinstance(inst.instruction, Pulse)
]
assert len(times_q1_cr) > 0
assert len(times_q1_cr) == len(times_q2_crc)
assert all(np.isclose(times_q1_cr, times_q2_crc))

# Check that the acquires start concurrently
q1_acquire = q1.get_acquire_channel()
q2_acquire = q2.get_acquire_channel()
times_q1_acquire = [
inst.start
for inst in pos_map[q1_acquire]
if isinstance(inst.instruction, Acquire)
]
times_q2_acquire = [
inst.start
for inst in pos_map[q2_acquire]
if isinstance(inst.instruction, Acquire)
]
assert len(times_q1_acquire) == 1
assert len(times_q2_acquire) == 1
assert np.isclose(times_q1_acquire[0], times_q2_acquire[0])

# Check that the measures start concurrently
q1_measure = q1.get_measure_channel()
q2_measure = q2.get_measure_channel()
times_q1_measure = [
inst.start
for inst in pos_map[q1_measure]
if isinstance(inst.instruction, MeasurePulse)
]
times_q2_measure = [
inst.start
for inst in pos_map[q2_measure]
if isinstance(inst.instruction, MeasurePulse)
]
assert len(times_q1_measure) == 1
assert len(times_q2_measure) == 1
assert np.isclose(times_q1_measure[0], times_q2_measure[0])

@pytest.mark.parametrize(
"hw", [get_test_model(), get_default_echo_hardware(), get_default_RTCS_hardware()]
)
def test_duration_timeline_times(self, hw):
"""
Tests that the creation of a duration timeline with a two-qubit circuit
gives a timeline where the position map for each pulse channel has instructions
that align
"""
engine = hw.create_engine()
q1 = hw.get_qubit(0)
q2 = hw.get_qubit(1)

# Build a simple circuit
builder = (
get_builder(hw)
.X(q1, np.pi / 2.0)
.X(q2, np.pi)
.cnot(q1, q2)
.measure_mean_z(q1)
.measure_mean_z(q2)
)
res1 = engine.create_duration_timeline(builder.instructions)

# Check the times match up
for val in res1.values():
end = 0
for pos in val:
assert end == pos.start
end = pos.end

def compare_position_maps(self, res1, res2):
for key in res1.keys():
starts1 = [
inst.start
for inst in res1[key]
if (not isinstance(inst.instruction, Delay))
]
starts2 = [
inst.start
for inst in res2[key]
if (not isinstance(inst.instruction, Delay))
]
ends1 = [
inst.start
for inst in res1[key]
if (not isinstance(inst.instruction, Delay))
]
ends2 = [
inst.start
for inst in res2[key]
if (not isinstance(inst.instruction, Delay))
]
assert starts1 == starts2
assert ends1 == ends2

@pytest.mark.parametrize(
"hw", [get_test_model(), get_default_echo_hardware(), get_default_RTCS_hardware()]
)
def test_duration_timeline_sync_ecr(self, hw):
"""
Tests that a redundant sync has no effect on the circuit.
In this example, the ECR adds a sync itself, so syncing before is redundant.
"""
engine = hw.create_engine()
q1 = hw.get_qubit(0)
q2 = hw.get_qubit(1)

# Build a simple circuit
builder = (
get_builder(hw)
.X(q1, np.pi / 2.0)
.X(q2, np.pi)
.ECR(q1, q2)
.measure_mean_z(q1)
.measure_mean_z(q2)
)
res1 = engine.create_duration_timeline(builder.instructions)

# Build the same circuit with an unnecessary sync & check the times match up
builder = (
get_builder(hw)
.X(q1, np.pi / 2.0)
.X(q2, np.pi)
.synchronize([q1, q2])
.ECR(q1, q2)
.measure_mean_z(q1)
.measure_mean_z(q2)
)
res2 = engine.create_duration_timeline(builder.instructions)
self.compare_position_maps(res1, res2)

@pytest.mark.parametrize(
"hw", [get_test_model(), get_default_echo_hardware(), get_default_RTCS_hardware()]
)
def test_duration_timeline_sync_x_pulses(self, hw):
"""
Tests that a redundant sync has no effect on the circuit.
In this example, the two x pulses on Q1 take the same time to execute as the single
pulser in Q2, so the sync is redundant.
"""
engine = hw.create_engine()
q1 = hw.get_qubit(0)
q2 = hw.get_qubit(1)

# Build a simple circuit
builder = (
get_builder(hw)
.X(q1, np.pi / 2.0)
.X(q2, np.pi)
.X(q1, np.pi / 2.0)
.had(q1)
.had(q2)
.measure_mean_z(q1)
.measure_mean_z(q2)
)
res1 = engine.create_duration_timeline(builder.instructions)

# Build the same circuit with an unnecessary sync & check the times match up
builder = (
get_builder(hw)
.X(q1, np.pi / 2.0)
.X(q2, np.pi)
.X(q1, np.pi / 2.0)
.synchronize([q1, q2])
.had(q1)
.had(q2)
.measure_mean_z(q1)
.measure_mean_z(q2)
)
res2 = engine.create_duration_timeline(builder.instructions)
self.compare_position_maps(res1, res2)

def evaluate_circuit_time(self, hw, engine, builder):
qatfile = InstructionEmitter().emit(builder.instructions, hw)
res = engine.create_duration_timeline(qatfile.instructions)
# pulse channels could have different block times
return max([val[-1].end * pc.block_time for pc, val in res.items()])

@pytest.mark.parametrize(
"hw", [get_test_model(), get_default_echo_hardware(), get_default_RTCS_hardware()]
)
def test_duration_timeline_compare(self, hw):
"""
Tests that the duration of individual circuit elements matches that
of the full circuit.
"""
engine = hw.create_engine()
q1 = hw.get_qubit(0)
q2 = hw.get_qubit(1)

# Build a simple circuit
builder = (
get_builder(hw)
.X(q1, np.pi / 2.0)
.X(q2, np.pi)
.ECR(q1, q2)
.measure_mean_z(q1)
.measure_mean_z(q2)
)
maxtime = self.evaluate_circuit_time(hw, engine, builder)

# Check the circuit executes in an expected time
circs = [
get_builder(hw).X(q1, np.pi / 2.0),
get_builder(hw).X(q2, np.pi),
get_builder(hw).ECR(q1, q2),
get_builder(hw).measure_mean_z(q1),
get_builder(hw).measure_mean_z(q2),
]

ts = [self.evaluate_circuit_time(hw, engine, circ) for circ in circs]

# ECR will sync q1 and q2 before and after, so the total execution time is
# a maximum of the X pulses plus a maximum of the measures.
assert np.isclose(max(ts[0], ts[1]) + ts[2] + max(ts[3], ts[4]), maxtime)

@pytest.mark.parametrize(
"hw", [get_test_model(), get_default_echo_hardware(), get_default_RTCS_hardware()]
)
def test_duration_timeline_compare_sync(self, hw):
"""
Tests that the duration of individual circuit elements matches that
of the full circuit when syncs are used.
"""
engine = hw.create_engine()
q1 = hw.get_qubit(0)
q2 = hw.get_qubit(1)

# Check that synchronize gives the expected times
builder = (
get_builder(hw)
.X(q1, np.pi / 2.0)
.synchronize([q1, q2])
.X(q2, np.pi)
.ECR(q1, q2)
.measure_mean_z(q1)
.measure_mean_z(q2)
)
maxtime = self.evaluate_circuit_time(hw, engine, builder)

# Check the circuit executes in an expected time
circs = [
get_builder(hw).X(q1, np.pi / 2.0),
get_builder(hw).X(q2, np.pi),
get_builder(hw).ECR(q1, q2),
get_builder(hw).measure_mean_z(q1),
get_builder(hw).measure_mean_z(q2),
]

ts = [self.evaluate_circuit_time(hw, engine, circ) for circ in circs]

# start time of measures will be synced because of the ECR, so the total
# execution time will only depend on the max of the two measures
assert np.isclose(ts[0] + ts[1] + ts[2] + max(ts[3], ts[4]), maxtime)

@pytest.mark.parametrize("pre_measures", [[0], [1], [0, 1]])
def test_mid_circuit_validation(self, pre_measures):
"""Tests that an error is thrown for mid-circuit measurements."""
Expand Down

0 comments on commit 164afc9

Please sign in to comment.