Skip to content

Commit

Permalink
Fetch selected dataset in remote mode (#273)
Browse files Browse the repository at this point in the history
This closes #258.

What I have implemented is:
- Fetch the selected dataset in remote mode.
- Handle switching the source mode; realtime and remote.


![image](https://github.com/snu-quiqcl/iquip/assets/76851886/cc9bd9e1-39db-4d21-827f-af616619b4fc)
  • Loading branch information
BECATRUE authored Apr 3, 2024
2 parents d75120b + d088385 commit 0617938
Showing 1 changed file with 141 additions and 42 deletions.
183 changes: 141 additions & 42 deletions iquip/apps/dataviewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ class SourceWidget(QWidget):
"""Widget for data source selection.
Signals:
datasetClicked(name): The dataset with name is clicked.
axisApplied(axis): Axis parameter selection apply button is clicked.
See SimpleScanDataPolicy.extract() for axis argument.
modeClicked(id): The source mode with id is clicked.
Expand All @@ -433,6 +434,7 @@ class SourceWidget(QWidget):
stack: The stacked widget for additional interface of each source option.
"""

datasetClicked = pyqtSignal(str)
axisApplied = pyqtSignal(tuple)
modeClicked = pyqtSignal(int)

Expand Down Expand Up @@ -479,6 +481,7 @@ def __init__(self, parent: Optional[QWidget] = None):
layout = QVBoxLayout(self)
layout.addLayout(datasetLayout)
layout.addLayout(sourceLayout)
self.datasetBox.currentTextChanged.connect(self.datasetClicked)
self.axisBoxes["X"].currentIndexChanged.connect(self._handleXIndexChanged)
self.axisApplyButton.clicked.connect(self._handleApplyClicked)
self.buttonGroup.idClicked.connect(self.stack.setCurrentIndex)
Expand Down Expand Up @@ -857,6 +860,10 @@ def __init__(self, parent: Optional[QWidget] = None):
self.addWidget(mainPlotBox)
self.addWidget(toolBox)

def sourceMode(self) -> SourceWidget.ButtonId:
"""Returns the current source mode."""
return SourceWidget.ButtonId(self.sourceWidget.buttonGroup.checkedId())

def datasetName(self) -> str:
"""Returns the current dataset name in the line edit."""
return self.sourceWidget.datasetBox.currentText()
Expand Down Expand Up @@ -884,10 +891,12 @@ def __init__(self, ip: str, port: int, parent: Optional[QObject] = None):
"""
super().__init__(parent=parent)
self.url = f"ws://{ip}:{port}/dataset/master/list/"
self.websocket: ClientConnection
self.websocket: Optional[ClientConnection] = None

def stop(self):
"""Stops the thread."""
if self.websocket is None:
return
try:
self.websocket.close()
except WebSocketException:
Expand All @@ -909,7 +918,7 @@ def run(self):
logger.exception("Failed to fetch the dataset name list.")


class _RealtimeFetcherThread(QThread):
class _RealtimeDatasetThread(QThread):
"""QThread for fetching the dataset in ARTIQ master from the proxy server.
Signals:
Expand Down Expand Up @@ -1080,11 +1089,76 @@ def run(self):
self.fetched.emit(filter_dataset_list(response.json()))


class _RemoteDatasetThread(QThread):
"""QThread for fetching the dataset in a specific RID.
Signals:
fetched(dataset, parameters, units): Information for the dataset is fetched.
See `SimpleScanDataPolicy` for argument description.
Attributes:
url: GET request url.
rid, name: Target RID and dataset name, respectively.
"""

fetched = pyqtSignal(np.ndarray, list, list)

# pylint: disable=too-many-arguments
def __init__(self, rid: int, name: str, ip: str, port: int, parent: Optional[QObject] = None):
"""Extended.
Args:
rid, name: See the attributes section.
ip, port: IP address and PORT number of the proxy server.
"""
super().__init__(parent=parent)
self.url = f"http://{ip}:{port}/dataset/rid/"
self.rid = rid
self.name = name

def _get(self, params: Dict[str, Any], default: Any = None) -> Any:
"""Returns the response of the given GET request.
When an exception occurs, it is delivered to the caller.
Args:
params: GET request parameters.
default: Return value replaced when the response is None.
"""
response = requests.get(self.url, params=params, timeout=5)
response.raise_for_status()
rawResponse = response.json()
if rawResponse is None:
return default
return rawResponse

def run(self):
"""Overridden."""
try:
rawDataset = self._get({"rid": self.rid, "key": self.name})
if rawDataset is None: # no dataset
return
dataset = np.array(rawDataset)
numParameters = dataset.shape[1] if dataset.ndim > 1 else 0
parameters = self._get({"rid": self.rid, "key": f"{self.name}.parameters"},
list(map(str, range(numParameters))))
rawUnits = self._get({"rid": self.rid, "key": f"{self.name}.units"})
if rawUnits:
units = [unit if unit else None for unit in rawUnits]
else:
units = [None] * numParameters
except requests.exceptions.RequestException:
logger.exception("Failed to fetch the dataset in a specific RID.")
return
self.fetched.emit(dataset, parameters, units)


class DataViewerApp(qiwis.BaseApp): # pylint: disable=too-many-instance-attributes
"""App for data visualization.
Attributes:
frame: DataViewerFrame instance.
*Part: Source mode widget in frame corresponding to the name.
*Thread: The most recently executed thread instance corresponding to the name.
policy: Data policy instance. None if there is currently no data.
axis: The current plot axis parameter indices. See SimpleScanDataPolicy.extract().
Expand All @@ -1095,27 +1169,29 @@ def __init__(self, name: str, parent: Optional[QObject] = None):
"""Extended."""
super().__init__(name, parent=parent)
self.frame = DataViewerFrame()
self.realtimeFetcherThread: Optional[_RealtimeFetcherThread] = None
self.realtimePart, self.remotePart = (self.frame.sourceWidget.stack.widget(buttonId)
for buttonId in SourceWidget.ButtonId)
self.realtimeDatasetThread: Optional[_RealtimeDatasetThread] = None
self.realtimeListThread: Optional[_RealtimeListThread] = None
self.ridListOfDateHourThread: _RidListOfDateHourThread
self.remoteListThread: _RemoteListThread
self.remoteDatasetThread: _RemoteDatasetThread
self.policy: Optional[SimpleScanDataPolicy] = None
self.axis: Tuple[int, ...] = ()
self.dataPointIndex: Tuple[int, ...] = ()
self.startRealtimeDatasetListThread()
realtimePart, remotePart = (self.frame.sourceWidget.stack.widget(buttonId)
for buttonId in SourceWidget.ButtonId)
# signal connection
realtimePart.syncToggled.connect(self._toggleSync)
realtimePart.restartButton.clicked.connect(self.startRealtimeDatasetListThread)
remotePart.dateHourChanged.connect(self.startRidListOfDateHourThread)
remotePart.ridClicked.connect(self.startRemoteListThread)
self.realtimePart.syncToggled.connect(self._toggleSync)
self.realtimePart.restartButton.clicked.connect(self.startRealtimeDatasetListThread)
self.remotePart.dateHourChanged.connect(self.startRidListOfDateHourThread)
self.remotePart.ridClicked.connect(self.startRemoteListThread)
self.frame.sourceWidget.modeClicked.connect(self.switchSourceMode)
self.frame.sourceWidget.datasetClicked.connect(self._handleDatasetClicked)
self.frame.sourceWidget.axisApplied.connect(self.setAxis)
self.frame.dataPointWidget.dataTypeChanged.connect(self.setDataType)
self.frame.dataPointWidget.thresholdChanged.connect(self.setThreshold)
self.frame.mainPlotWidget.dataClicked.connect(self.selectDataPoint)
remotePart.updateRidComboBox()
self.remotePart.updateRidComboBox()

@pyqtSlot(int)
def switchSourceMode(self, buttonId: int):
Expand All @@ -1129,23 +1205,34 @@ def switchSourceMode(self, buttonId: int):
self.startRealtimeDatasetListThread()
else:
self.realtimeListThread.stop()
remotePart: _RemotePart = self.frame.sourceWidget.stack.widget(
SourceWidget.ButtonId.REMOTE
)
self.startRemoteListThread(remotePart.ridComboBox.currentText())
self.startRemoteListThread(self.remotePart.ridComboBox.currentText())

@pyqtSlot(str)
def _handleDatasetClicked(self, name: str):
"""Called when the given dataset name is clicked.
Args:
See SourceWidget.datasetClicked signal.
"""
if not name:
return
mode = self.frame.sourceMode()
if mode == SourceWidget.ButtonId.REALTIME:
if self.realtimePart.syncButton.isChecked(): # if in sync, stop it
self.realtimePart.syncButton.click()
else:
self.startRemoteDatasetThread(name)

@pyqtSlot()
def startRealtimeDatasetListThread(self):
"""Creates and starts a new _RealtimeListThread instance."""
realtimePart: _RealtimePart = self.frame.sourceWidget.stack.widget(
SourceWidget.ButtonId.REALTIME
)
self.realtimeListThread = _RealtimeListThread(
self.constants.proxy_ip, # pylint: disable=no-member
self.constants.proxy_port, # pylint: disable=no-member
)
self.realtimeListThread.fetched.connect(self._updateDatasetBox, type=Qt.QueuedConnection)
self.realtimeListThread.finished.connect(
functools.partial(realtimePart.restartButton.setEnabled, True),
functools.partial(self.realtimePart.restartButton.setEnabled, True),
type=Qt.QueuedConnection
)
self.realtimeListThread.finished.connect(self.realtimeListThread.deleteLater)
Expand Down Expand Up @@ -1175,30 +1262,28 @@ def _toggleSync(self, checked: bool):
if checked:
self.synchronize()
else:
self.realtimeFetcherThread.stop()
self.realtimeDatasetThread.stop()

def synchronize(self):
"""Fetches the dataset from artiq master and updates the viewer."""
realtimePart: _RealtimePart = self.frame.sourceWidget.stack.widget(
SourceWidget.ButtonId.REALTIME
)
realtimePart.setStatus(message="Start synchronizing.")
self.realtimeFetcherThread = _RealtimeFetcherThread(
self.realtimePart.setStatus(message="Start synchronizing.")
self.realtimeDatasetThread = _RealtimeDatasetThread(
self.frame.datasetName(),
realtimePart.periodSpinBox.value(),
self.realtimePart.periodSpinBox.value(),
self.constants.proxy_ip, # pylint: disable=no-member
self.constants.proxy_port, # pylint: disable=no-member
)
self.realtimeFetcherThread.initialized.connect(self.setDataset, type=Qt.QueuedConnection)
self.realtimeFetcherThread.modified.connect(self.modifyDataset, type=Qt.QueuedConnection)
self.realtimeFetcherThread.stopped.connect(realtimePart.setStatus, type=Qt.QueuedConnection)
self.realtimeFetcherThread.finished.connect(
functools.partial(realtimePart.setStatus, sync=False, enable=True),
self.realtimeDatasetThread.initialized.connect(self.setDataset, type=Qt.QueuedConnection)
self.realtimeDatasetThread.modified.connect(self.modifyDataset, type=Qt.QueuedConnection)
self.realtimeDatasetThread.stopped.connect(
self.realtimePart.setStatus, type=Qt.QueuedConnection)
self.realtimeDatasetThread.finished.connect(
functools.partial(self.realtimePart.setStatus, sync=False, enable=True),
type=Qt.QueuedConnection,
)
self.realtimeFetcherThread.finished.connect(self.realtimeFetcherThread.deleteLater)
self.realtimeFetcherThread.start()
realtimePart.setStatus(enable=True)
self.realtimeDatasetThread.finished.connect(self.realtimeDatasetThread.deleteLater)
self.realtimeDatasetThread.start()
self.realtimePart.setStatus(enable=True)

@pyqtSlot(str, object)
def startRidListOfDateHourThread(self, date: str, hour: Optional[int]):
Expand All @@ -1224,11 +1309,8 @@ def updateRidList(self, rids: List[int]):
Args:
See _RidListOfDateHourThread.fetched signal.
"""
remotePart: _RemotePart = self.frame.sourceWidget.stack.widget(
SourceWidget.ButtonId.REMOTE
)
remotePart.ridComboBox.clear()
remotePart.ridComboBox.addItems(list(map(str, rids)))
self.remotePart.ridComboBox.clear()
self.remotePart.ridComboBox.addItems(list(map(str, rids)))

@pyqtSlot(str)
def startRemoteListThread(self, rid: str):
Expand All @@ -1249,6 +1331,23 @@ def startRemoteListThread(self, rid: str):
self.remoteListThread.finished.connect(self.remoteListThread.deleteLater)
self.remoteListThread.start()

def startRemoteDatasetThread(self, name: str):
"""Creates and starts a new _RemoteDatasetThread instance.
Args:
See _RemoteDatasetThread.__init__().
"""
rid = int(self.remotePart.ridComboBox.currentText())
self.remoteDatasetThread = _RemoteDatasetThread(
rid,
name,
self.constants.proxy_ip, # pylint: disable=no-member
self.constants.proxy_port, # pylint: disable=no-member
)
self.remoteDatasetThread.fetched.connect(self.setDataset, type=Qt.QueuedConnection)
self.remoteDatasetThread.finished.connect(self.remoteDatasetThread.deleteLater)
self.remoteDatasetThread.start()

@pyqtSlot(np.ndarray, list, list)
def setDataset(
self,
Expand All @@ -1269,7 +1368,7 @@ def modifyDataset(self, modifications: List[Dict[str, Any]]):
"""Modifies the dataset and updates the plot.
Args:
See _RealtimeFetcherThread.modified signal.
See _RealtimeDatasetThread.modified signal.
"""
# TODO(kangz12345@snu.ac.kr): Implement modifications other than "append".
if self.policy is None:
Expand All @@ -1282,9 +1381,9 @@ def modifyDataset(self, modifications: List[Dict[str, Any]]):
self.policy.dataset = np.concatenate((self.policy.dataset, appended))
if self.axis:
self.updateMainPlot(self.axis, self.frame.dataPointWidget.dataType())
self.realtimeFetcherThread.mutex.lock()
self.realtimeFetcherThread.mutex.unlock()
self.realtimeFetcherThread.modifyDone.wakeAll()
self.realtimeDatasetThread.mutex.lock()
self.realtimeDatasetThread.mutex.unlock()
self.realtimeDatasetThread.modifyDone.wakeAll()

@pyqtSlot(tuple)
def setAxis(self, axis: Sequence[int]):
Expand Down

0 comments on commit 0617938

Please sign in to comment.