Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/extend qknn #94

Merged
merged 17 commits into from
Oct 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
from qhana_plugin_runner.util.plugins import plugin_identifier, QHAnaPluginBase

_plugin_name = "quantum-k-nearest-neighbours"
__version__ = "v0.1.0"
__version__ = "v0.2.0"
_identifier = plugin_identifier(_plugin_name, __version__)


QKNN_BLP = SecurityBlueprint(
_identifier, # blueprint name
__name__, # module import name!
description="Quantum k nearest neighbours plugin API.",
template_folder="",
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(
raise ValueError(
"A QAM (Quantum Associative Memory) can only load binary data"
)
self.xor_X = self.create_xor_X(X)
self.xor_X = self.create_xor_X(self.X)

if additional_bits is not None:
if not is_binary(additional_bits):
Expand Down Expand Up @@ -90,7 +90,9 @@ def __init__(
self.ancilla_wires = self.ancilla_wires[2:]

if amplitudes is None:
self.amplitudes = [1 / np.sqrt(X.shape[0], dtype=np.float64)] * X.shape[0]
self.amplitudes = [
1 / np.sqrt(self.X.shape[0], dtype=np.float64)
] * self.X.shape[0]
else:
self.amplitudes = amplitudes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ def get_qknn_and_total_wires(
SchuldQkNN, max_wires, train_data=train_data, train_labels=train_labels
)
if use_access_wires:
wires[2] = wires[2] + access_wires
wires[3] = wires[3] + access_wires

return SchuldQkNN(
train_data, train_labels, wires[0], wires[1], wires[2], None
train_data, train_labels, wires[0], wires[1], wires[2], wires[3], None
), count_wires(wires)
elif self == QkNNEnum.simple_hamming_qknn:
from .simpleQkNN import SimpleHammingQkNN
Expand All @@ -103,10 +103,10 @@ def get_qknn_and_total_wires(
SimpleHammingQkNN, max_wires, train_data=train_data
)
if use_access_wires:
wires[1] = wires[1] + access_wires
wires[2] = wires[2] + access_wires

return SimpleHammingQkNN(
train_data, train_labels, k, wires[0], wires[1], None
train_data, train_labels, k, wires[0], wires[1], wires[2], None
), count_wires(wires)
elif self == QkNNEnum.simple_fidelity_qknn:
from .simpleQkNN import SimpleFidelityQkNN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
int_to_bitlist,
check_binary,
ceil_log2,
check_for_duplicates,
)
from ..check_wires import check_wires_uniqueness, check_num_wires

Expand All @@ -33,6 +32,7 @@ def __init__(
self,
train_data: np.ndarray,
train_labels: np.ndarray,
idx_wires: List[int],
train_wires: List[int],
label_wires: List[int],
qam_ancilla_wires: List[int],
Expand All @@ -48,21 +48,20 @@ def __init__(
"All the data needs to be binary, when dealing with the hamming distance",
)

check_for_duplicates(
self.train_data, "The training data may not contain duplicates."
)

self.train_data = np.array(train_data, dtype=int)

self.label_indices = self.init_labels(train_labels)

self.unclean_wires = [] if unclean_wires is None else unclean_wires

self.idx_wires = idx_wires
self.train_wires = train_wires
self.qam_ancilla_wires = qam_ancilla_wires
self.label_wires = label_wires
wire_types = ["train", "label", "qam_ancilla", "unclean"]
wire_types = ["idx", "train", "label", "qam_ancilla", "unclean"]
num_idx_wires = int(np.ceil(np.log2(self.train_data.shape[0])))
num_wires = [
num_idx_wires,
self.train_data.shape[1],
self.label_indices.shape[1],
max(self.train_data.shape[1], 2),
Expand All @@ -76,11 +75,16 @@ def __init__(
check_num_wires(self, wire_types[:-1], num_wires, error_msgs)

self.qam = QAM(
self.train_data,
self.train_wires,
np.array(
[
int_to_bitlist(i, num_idx_wires)
for i in range(self.train_data.shape[0])
]
), # The indices
self.idx_wires,
self.qam_ancilla_wires,
additional_bits=self.label_indices,
additional_wires=self.label_wires,
additional_bits=np.concatenate((self.train_data, self.label_indices), axis=1),
additional_wires=self.train_wires + self.label_wires,
unclean_wires=unclean_wires,
)

Expand All @@ -105,10 +109,13 @@ def get_label_from_samples(self, samples: List[List[int]]) -> int:
is equal to |0>.
"""
label_probs = np.zeros(len(self.unique_labels))
counts = np.zeros(len(self.unique_labels))
for sample in samples:
label = bitlist_to_int(sample[1:])
if sample[0] == 0 and label < len(label_probs):
label_probs[label] += 1
if label < len(label_probs):
counts[label] += 1
return self.unique_labels[label_probs.argmax()]

def get_quantum_circuit(self, x: np.ndarray) -> Callable[[], None]:
Expand All @@ -128,7 +135,7 @@ def get_quantum_circuit(self, x: np.ndarray) -> Callable[[], None]:
def circuit():
self.qam.circuit()
for x_, train_wire in zip(x, self.train_wires):
if x_ == 0:
if x_ == 1:
qml.PauliX((train_wire,))
for train_wire in self.train_wires:
# QAM ancilla wires are 0 after QAM -> use one of those wires
Expand All @@ -147,8 +154,9 @@ def label_point(self, x: np.ndarray) -> int:
@staticmethod
def get_necessary_wires(
train_data: np.ndarray, train_labels: np.ndarray
) -> Tuple[int, int, int]:
) -> Tuple[int, int, int, int]:
return (
int(np.ceil(np.log2(train_data.shape[0]))),
len(train_data[0]),
ceil_log2(len(set(train_labels))),
max(len(train_data[0]), 2),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from .qknn import QkNN
from ..data_loading_circuits import QAM
from ..data_loading_circuits import TreeLoader
from ..utils import bitlist_to_int, check_binary, ceil_log2, check_for_duplicates
from ..utils import int_to_bitlist, bitlist_to_int, check_binary, ceil_log2
from ..check_wires import check_wires_uniqueness, check_num_wires


Expand Down Expand Up @@ -68,6 +68,7 @@ def __init__(
train_data: np.ndarray,
train_labels: np.ndarray,
k: int,
idx_wires: List[int],
train_wires: List[int],
qam_ancilla_wires: List[int],
backend: qml.Device,
Expand All @@ -79,31 +80,39 @@ def __init__(
self.train_data,
"All the data needs to be binary, when dealing with the hamming distance",
)
check_for_duplicates(
self.train_data, "The training data may not contain duplicates."
)
self.train_data = np.array(train_data, dtype=int)

self.point_num_to_idx = {}
for i, vec in enumerate(self.train_data):
self.point_num_to_idx[bitlist_to_int(vec)] = i

self.unclean_wires = [] if unclean_wires is None else unclean_wires
self.idx_wires = idx_wires
self.train_wires = train_wires
self.qam_ancilla_wires = qam_ancilla_wires
wire_types = ["train", "qam_ancilla", "unclean"]
num_wires = [self.train_data.shape[1], max(self.train_data.shape[1], 2)]
wire_types = ["idx", "train", "qam_ancilla", "unclean"]
num_idx_wires = int(np.ceil(np.log2(self.train_data.shape[0])))
num_wires = [
num_idx_wires,
self.train_data.shape[1],
max(self.train_data.shape[0], 2),
]
error_msgs = [
"the round up log2 of the number of points, i.e. ceil(log2(no. points))."
"the points' dimensionality.",
"the points' dimensionality and greater or equal to 2.",
]

check_wires_uniqueness(self, wire_types)
check_num_wires(self, wire_types[:-1], num_wires, error_msgs)
self.qam = QAM(
self.train_data,
self.train_wires,
np.array(
[
int_to_bitlist(i, num_idx_wires)
for i in range(self.train_data.shape[0])
]
), # The indices
self.idx_wires,
self.qam_ancilla_wires,
unclean_wires=self.unclean_wires,
additional_wires=self.train_wires,
additional_bits=self.train_data,
)

def get_quantum_circuit(self, x: np.ndarray):
Expand All @@ -130,10 +139,8 @@ def quantum_circuit():
for train_wire in self.train_wires:
# QAM ancilla wires are 0 after QAM -> use one of those wires
qml.CRX(rot_angle, wires=(train_wire, self.qam_ancilla_wires[0]))
for x_, train_wire in zip(x, self.train_wires):
if x_ == 0:
qml.PauliX((train_wire,))
return qml.sample(wires=self.train_wires + [self.qam_ancilla_wires[0]])

return qml.sample(wires=self.idx_wires + [self.qam_ancilla_wires[0]])

return quantum_circuit

Expand All @@ -144,8 +151,8 @@ def calculate_distances(self, x: np.ndarray) -> List[float]:
# Count how often a certain point was measured (total_ancilla)
# and how often the ancilla qubit was zero (num_zero_ancilla)
for sample in samples:
idx = self.point_num_to_idx.get(bitlist_to_int(sample[:-1]), None)
if idx is not None:
idx = bitlist_to_int(sample[:-1])
if idx < len(self.train_data):
total_ancilla[idx] += 1
if sample[-1] == 0:
num_zero_ancilla[idx] += 1
Expand All @@ -159,8 +166,12 @@ def calculate_distances(self, x: np.ndarray) -> List[float]:
return num_zero_ancilla

@staticmethod
def get_necessary_wires(train_data: np.ndarray) -> Tuple[int, int]:
return len(train_data[0]), max(len(train_data[0]), 2)
def get_necessary_wires(train_data: np.ndarray) -> Tuple[int, int, int]:
return (
int(np.ceil(np.log2(train_data.shape[0]))),
len(train_data[0]),
max(len(train_data[0]), 2),
)

def get_representative_circuit(self, X: np.ndarray) -> str:
circuit = qml.QNode(self.get_quantum_circuit(X[0]), self.backend)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@


class QuantumBackends(enum.Enum):
custom_ibmq = "custom_ibmq"
pennylane_default = "pennylane_default"
aer_statevector_simulator = "aer_statevector_simulator"
aer_qasm_simulator = "aer_qasm_simulator"
ibmq_qasm_simulator = "ibmq_qasm_simulator"
Expand All @@ -36,7 +36,7 @@ class QuantumBackends(enum.Enum):
ibmq_belem = "ibmq_belem"
ibmq_lima = "ibmq_lima"
ibmq_armonk = "ibmq_armonk"
pennylane_default = "pennylane_default"
custom_ibmq = "custom_ibmq"

def get_max_num_qbits(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def bitlist_to_int(bitlist: List[int]) -> int:

def int_to_bitlist(num, length: int):
binary = bin(num)[2:]
result = [int(el) for el in reversed(binary)]
result = [int(el) for el in binary]
if len(result) > length:
raise ValueError(
f"Binary representation of {num} needs at least {len(result)} bits, but only got {length}."
Expand All @@ -47,9 +47,3 @@ def check_binary(data: np.ndarray, error_msg: str):

def ceil_log2(value: float) -> int:
return int(np.ceil(np.log2(value)))


def check_for_duplicates(data: np.ndarray, error_msg: str):
print(f"Checking the following data for duplicates:\n{data}")
if len(np.unique(data, axis=0)) != len(data):
raise ValueError(error_msg)

This file was deleted.

Loading
Loading