Skip to content

Commit

Permalink
Implement support for cryptography api (#719)
Browse files Browse the repository at this point in the history
* feat: add cryptography API in grpc client and add tests

Signed-off-by: KentHsu <chiahaohsu9@gmail.com>

* feat: add cryptography API in async grpc client and add tests

Signed-off-by: KentHsu <chiahaohsu9@gmail.com>

* doc: add encrypt and decrypt examples

Signed-off-by: KentHsu <chiahaohsu9@gmail.com>

* doc: refactor crypto examples

Signed-off-by: KentHsu <chiahaohsu9@gmail.com>

* tests: add more tests for crypto request iterator

Signed-off-by: KentHsu <chiahaohsu9@gmail.com>

* fix: refactor crypto request iterator

Signed-off-by: KentHsu <chiahaohsu9@gmail.com>

---------

Signed-off-by: KentHsu <chiahaohsu9@gmail.com>
Signed-off-by: Bernd Verst <github@bernd.dev>
Co-authored-by: Bernd Verst <github@bernd.dev>
  • Loading branch information
KentHsu and berndverst authored Jul 2, 2024
1 parent f43c0aa commit e52a273
Show file tree
Hide file tree
Showing 21 changed files with 1,728 additions and 3 deletions.
116 changes: 116 additions & 0 deletions dapr/aio/clients/grpc/_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# -*- coding: utf-8 -*-

"""
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import io
from typing import Union

from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions
from dapr.clients.grpc._helpers import to_bytes
from dapr.clients.grpc._request import DaprRequest
from dapr.proto import api_v1, common_v1


class EncryptRequestIterator(DaprRequest):
"""An asynchronous iterator for cryptography encrypt API requests.
This reads data from a given stream by chunks and converts it to an asynchronous iterator
of cryptography encrypt API requests.
This iterator will be used for encrypt gRPC bidirectional streaming requests.
"""

def __init__(
self,
data: Union[str, bytes],
options: EncryptOptions,
):
"""Initialize EncryptRequestIterator with data and encryption options.
Args:
data (Union[str, bytes]): data to be encrypted
options (EncryptOptions): encryption options
"""
self.data = io.BytesIO(to_bytes(data))
self.options = options.get_proto()
self.buffer_size = 2 << 10 # 2KiB
self.seq = 0

def __aiter__(self):
"""Returns the iterator object itself."""
return self

async def __anext__(self):
"""Read the next chunk of data from the input stream and create a gRPC stream request."""
# Read data from the input stream, in chunks of up to 2KiB
# Send the data until we reach the end of the input stream
chunk = self.data.read(self.buffer_size)
if not chunk:
raise StopAsyncIteration

payload = common_v1.StreamPayload(data=chunk, seq=self.seq)
if self.seq == 0:
# If this is the first chunk, add the options
request_proto = api_v1.EncryptRequest(payload=payload, options=self.options)
else:
request_proto = api_v1.EncryptRequest(payload=payload)

self.seq += 1
return request_proto


class DecryptRequestIterator(DaprRequest):
"""An asynchronous iterator for cryptography decrypt API requests.
This reads data from a given stream by chunks and converts it to an asynchronous iterator
of cryptography decrypt API requests.
This iterator will be used for encrypt gRPC bidirectional streaming requests.
"""

def __init__(
self,
data: Union[str, bytes],
options: DecryptOptions,
):
"""Initialize DecryptRequestIterator with data and decryption options.
Args:
data (Union[str, bytes]): data to be decrypted
options (DecryptOptions): decryption options
"""
self.data = io.BytesIO(to_bytes(data))
self.options = options.get_proto()
self.buffer_size = 2 << 10 # 2KiB
self.seq = 0

def __aiter__(self):
"""Returns the iterator object itself."""
return self

async def __anext__(self):
"""Read the next chunk of data from the input stream and create a gRPC stream request."""
# Read data from the input stream, in chunks of up to 2KiB
# Send the data until we reach the end of the input stream
chunk = self.data.read(self.buffer_size)
if not chunk:
raise StopAsyncIteration

payload = common_v1.StreamPayload(data=chunk, seq=self.seq)
if self.seq == 0:
# If this is the first chunk, add the options
request_proto = api_v1.DecryptRequest(payload=payload, options=self.options)
else:
request_proto = api_v1.DecryptRequest(payload=payload)

self.seq += 1
return request_proto
91 changes: 91 additions & 0 deletions dapr/aio/clients/grpc/_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# -*- coding: utf-8 -*-

"""
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from typing import AsyncGenerator, Generic

from dapr.proto import api_v1
from dapr.clients.grpc._response import DaprResponse, TCryptoResponse


class CryptoResponse(DaprResponse, Generic[TCryptoResponse]):
"""An asynchronous iterable of cryptography API responses."""

def __init__(self, stream: AsyncGenerator[TCryptoResponse, None]):
"""Initialize a CryptoResponse.
Args:
stream (AsyncGenerator[TCryptoResponse, None, None]): A stream of cryptography API responses.
"""
self._stream = stream
self._buffer = bytearray()
self._expected_seq = 0

async def __aiter__(self) -> AsyncGenerator[bytes, None]:
"""Read the next chunk of data from the stream.
Yields:
bytes: The payload data of the next chunk from the stream.
Raises:
ValueError: If the sequence number of the next chunk is incorrect.
"""
async for chunk in self._stream:
if chunk.payload.seq != self._expected_seq:
raise ValueError('invalid sequence number in chunk')
self._expected_seq += 1
yield chunk.payload.data

async def read(self, size: int = -1) -> bytes:
"""Read bytes from the stream.
If size is -1, the entire stream is read and returned as bytes.
Otherwise, up to `size` bytes are read from the stream and returned.
If the stream ends before `size` bytes are available, the remaining
bytes are returned.
Args:
size (int): The maximum number of bytes to read. If -1 (the default),
read until the end of the stream.
Returns:
bytes: The bytes read from the stream.
"""
if size == -1:
# Read the entire stream
return b''.join([chunk async for chunk in self])

# Read the requested number of bytes
data = bytes(self._buffer)
self._buffer.clear()

async for chunk in self:
data += chunk
if len(data) >= size:
break

# Update the buffer
remaining = data[size:]
self._buffer.extend(remaining)

# Return the requested number of bytes
return data[:size]


class EncryptResponse(CryptoResponse[api_v1.EncryptResponse]):
...


class DecryptResponse(CryptoResponse[api_v1.DecryptResponse]):
...
100 changes: 100 additions & 0 deletions dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
)

from dapr.clients.exceptions import DaprInternalError, DaprGrpcError
from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions
from dapr.clients.grpc._state import StateOptions, StateItem
from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus
from dapr.clients.health import DaprHealth
Expand All @@ -60,6 +61,14 @@
validateNotNone,
validateNotBlankString,
)
from dapr.aio.clients.grpc._request import (
EncryptRequestIterator,
DecryptRequestIterator,
)
from dapr.aio.clients.grpc._response import (
EncryptResponse,
DecryptResponse,
)
from dapr.clients.grpc._request import (
InvokeMethodRequest,
BindingRequest,
Expand Down Expand Up @@ -1227,6 +1236,97 @@ async def unlock(self, store_name: str, resource_id: str, lock_owner: str) -> Un
status=UnlockResponseStatus(response.status), headers=await call.initial_metadata()
)

async def encrypt(self, data: Union[str, bytes], options: EncryptOptions):
"""Encrypt a stream data with given options.
The encrypt API encrypts a stream data with the given options.
Example:
from dapr.aio.clients import DaprClient
from dapr.clients.grpc._crypto import EncryptOptions
async with DaprClient() as d:
options = EncryptOptions(
component_name='crypto_component',
key_name='crypto_key',
key_wrap_algorithm='RSA',
)
resp = await d.encrypt(
data='hello dapr',
options=options,
)
encrypted_data = await resp.read()
Args:
data (Union[str, bytes]): Data to be encrypted.
options (EncryptOptions): Encryption options.
Returns:
Readable stream of `api_v1.EncryptResponse`.
Raises:
ValueError: If component_name, key_name, or key_wrap_algorithm is empty.
"""
# Warnings and input validation
warn(
'The Encrypt API is an Alpha version and is subject to change.',
UserWarning,
stacklevel=2,
)
validateNotBlankString(
component_name=options.component_name,
key_name=options.key_name,
key_wrap_algorithm=options.key_wrap_algorithm,
)

req_iterator = EncryptRequestIterator(data, options)
resp_stream = self._stub.EncryptAlpha1(req_iterator)
return EncryptResponse(resp_stream)

async def decrypt(self, data: Union[str, bytes], options: DecryptOptions):
"""Decrypt a stream data with given options.
The decrypt API decrypts a stream data with the given options.
Example:
from dapr.aio.clients import DaprClient
from dapr.clients.grpc._crypto import DecryptOptions
async with DaprClient() as d:
options = DecryptOptions(
component_name='crypto_component',
key_name='crypto_key',
)
resp = await d.decrypt(
data='hello dapr',
options=options,
)
decrypted_data = await resp.read()
Args:
data (Union[str, bytes]): Data to be decrypted.
options (DecryptOptions): Decryption options.
Returns:
Readable stream of `api_v1.DecryptResponse`.
Raises:
ValueError: If component_name is empty.
"""
# Warnings and input validation
warn(
'The Decrypt API is an Alpha version and is subject to change.',
UserWarning,
stacklevel=2,
)
validateNotBlankString(
component_name=options.component_name,
)

req_iterator = DecryptRequestIterator(data, options)
resp_stream = self._stub.DecryptAlpha1(req_iterator)
return DecryptResponse(resp_stream)

async def start_workflow(
self,
workflow_component: str,
Expand Down
Loading

0 comments on commit e52a273

Please sign in to comment.