Skip to content

Commit

Permalink
Updated endpoint parsing (#618)
Browse files Browse the repository at this point in the history
* wip

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Updates endpoint parsing for new spec

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Adds support for unix URIs. Clean up.

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Completes support for all valid grpc endpoints

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Code cleanup

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Adds a warning about the http and https schemes being deprecated for gRPC endpoints

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Updates docs

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Updates the docs for clarity and correctness

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Adds anoter test for vsock without a port

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Adds more test cases and handles dns with ipv6

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Code cleanup

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Fixes linter

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Fixes linter errors

Signed-off-by: Elena Kolevska <elena@kolevska.com>

---------

Signed-off-by: Elena Kolevska <elena@kolevska.com>
  • Loading branch information
elena-kolevska authored Oct 31, 2023
1 parent 6171b67 commit 690ac99
Show file tree
Hide file tree
Showing 8 changed files with 383 additions and 244 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,6 @@ venv.bak/

# mypy
.mypy_cache/

# OSX specific files
.DS_Store
20 changes: 12 additions & 8 deletions dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from dapr.clients.exceptions import DaprInternalError
from dapr.clients.grpc._state import StateOptions, StateItem
from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus
from dapr.conf.helpers import parse_endpoint
from dapr.conf.helpers import GrpcEndpoint
from dapr.conf import settings
from dapr.proto import api_v1, api_service_v1, common_v1
from dapr.proto.runtime.v1.dapr_pb2 import UnsubscribeConfigurationResponse
Expand Down Expand Up @@ -139,14 +139,18 @@ def __init__(
address = settings.DAPR_GRPC_ENDPOINT or (f"{settings.DAPR_RUNTIME_HOST}:"
f"{settings.DAPR_GRPC_PORT}")

self._scheme, self._hostname, self._port = parse_endpoint(address)
try:
self._uri = GrpcEndpoint(address)
except ValueError as error:
raise DaprInternalError(f'{error}') from error

if self._scheme == "https":
self._channel = grpc.aio.secure_channel(f"{self._hostname}:{self._port}",
if self._uri.tls:
self._channel = grpc.aio.secure_channel(self._uri.endpoint,
credentials=self.get_credentials(),
options=options)
options=options) # type: ignore
else:
self._channel = grpc.aio.insecure_channel(address, options) # type: ignore
self._channel = grpc.aio.insecure_channel(self._uri.endpoint,
options) # type: ignore

if settings.DAPR_API_TOKEN:
api_token_interceptor = DaprClientInterceptorAsync([
Expand All @@ -164,7 +168,7 @@ def get_credentials(self):

async def close(self):
"""Closes Dapr runtime gRPC channel."""
if self._channel:
if hasattr(self, '_channel') and self._channel:
self._channel.close()

async def __aenter__(self) -> Self: # type: ignore
Expand Down Expand Up @@ -1442,7 +1446,7 @@ async def wait(self, timeout_s: float):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(timeout_s)
try:
s.connect((self._hostname, self._port))
s.connect((self._uri.hostname, self._uri.port_as_int))
return
except Exception as e:
remaining = (start + timeout_s) - time.time()
Expand Down
31 changes: 17 additions & 14 deletions dapr/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
validateNotNone,
validateNotBlankString,
)
from dapr.conf.helpers import parse_endpoint
from dapr.conf.helpers import GrpcEndpoint
from dapr.clients.grpc._request import (
InvokeMethodRequest,
BindingRequest,
Expand Down Expand Up @@ -138,15 +138,18 @@ def __init__(
address = settings.DAPR_GRPC_ENDPOINT or (f"{settings.DAPR_RUNTIME_HOST}:"
f"{settings.DAPR_GRPC_PORT}")

self._scheme, self._hostname, self._port = parse_endpoint(address)
try:
self._uri = GrpcEndpoint(address)
except ValueError as error:
raise DaprInternalError(f'{error}') from error

if self._scheme == "https":
self._channel = grpc.secure_channel(f"{self._hostname}:{self._port}", # type: ignore
if self._uri.tls:
self._channel = grpc.secure_channel(self._uri.endpoint, # type: ignore
self.get_credentials(),

options=options)
else:
self._channel = grpc.insecure_channel(address, options=options) # type: ignore
self._channel = grpc.insecure_channel(self._uri.endpoint, # type: ignore
options=options)

if settings.DAPR_API_TOKEN:
api_token_interceptor = DaprClientInterceptor([
Expand All @@ -166,7 +169,7 @@ def get_credentials(self):

def close(self):
"""Closes Dapr runtime gRPC channel."""
if self._channel:
if hasattr(self, '_channel') and self._channel:
self._channel.close()

def __del__(self):
Expand Down Expand Up @@ -805,8 +808,8 @@ def delete_state(
:class:`DaprResponse` gRPC metadata returned from callee
"""
if metadata is not None:
warn('metadata argument is deprecated. Dapr already intercepts API token headers '
'and this is not needed.', DeprecationWarning, stacklevel=2)
warn('metadata argument is deprecated. Dapr already intercepts API token '
'headers and this is not needed.', DeprecationWarning, stacklevel=2)

if not store_name or len(store_name) == 0 or len(store_name.strip()) == 0:
raise ValueError("State store name cannot be empty")
Expand Down Expand Up @@ -861,8 +864,8 @@ def get_secret(
:class:`GetSecretResponse` object with the secret and metadata returned from callee
"""
if metadata is not None:
warn('metadata argument is deprecated. Dapr already intercepts API token headers '
'and this is not needed.', DeprecationWarning, stacklevel=2)
warn('metadata argument is deprecated. Dapr already intercepts API token '
'headers and this is not needed.', DeprecationWarning, stacklevel=2)

req = api_v1.GetSecretRequest(
store_name=store_name,
Expand Down Expand Up @@ -908,8 +911,8 @@ def get_bulk_secret(
:class:`GetBulkSecretResponse` object with secrets and metadata returned from callee
"""
if metadata is not None:
warn('metadata argument is deprecated. Dapr already intercepts API token headers '
'and this is not needed.', DeprecationWarning, stacklevel=2)
warn('metadata argument is deprecated. Dapr already intercepts API token '
'headers and this is not needed.', DeprecationWarning, stacklevel=2)

req = api_v1.GetBulkSecretRequest(
store_name=store_name,
Expand Down Expand Up @@ -1431,7 +1434,7 @@ def wait(self, timeout_s: float):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(timeout_s)
try:
s.connect((self._hostname, self._port))
s.connect((self._uri.hostname, self._uri.port_as_int))
return
except Exception as e:
remaining = (start + timeout_s) - time.time()
Expand Down
233 changes: 180 additions & 53 deletions dapr/conf/helpers.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,183 @@
from typing import Tuple


def parse_endpoint(address: str) -> Tuple[str, str, int]:
scheme = "http"
fqdn = "localhost"
port = 80
addr = address

addr_list = address.split("://")

if len(addr_list) == 2:
# A scheme was explicitly specified
scheme = addr_list[0]
if scheme == "https":
port = 443
addr = addr_list[1]

addr_list = addr.split(":")
if len(addr_list) == 2:
# A port was explicitly specified
if len(addr_list[0]) > 0:
fqdn = addr_list[0]
# Account for Endpoints of the type http://localhost:3500/v1.0/invoke
addr_list = addr_list[1].split("/")
port = addr_list[0] # type: ignore
elif len(addr_list) == 1:
# No port was specified
# Account for Endpoints of the type :3500/v1.0/invoke
addr_list = addr_list[0].split("/")
fqdn = addr_list[0]
else:
# IPv6 address
addr_list = addr.split("]:")
if len(addr_list) == 2:
# A port was explicitly specified
fqdn = addr_list[0]
fqdn = fqdn.replace("[", "")

addr_list = addr_list[1].split("/")
port = addr_list[0] # type: ignore
elif len(addr_list) == 1:
# No port was specified
addr_list = addr_list[0].split("/")
fqdn = addr_list[0]
fqdn = fqdn.replace("[", "")
fqdn = fqdn.replace("]", "")
from warnings import warn
from urllib.parse import urlparse, parse_qs, ParseResult


class URIParseConfig:
DEFAULT_SCHEME = "dns"
DEFAULT_HOSTNAME = "localhost"
DEFAULT_PORT = 443
DEFAULT_AUTHORITY = ""
ACCEPTED_SCHEMES = ["dns", "unix", "unix-abstract", "vsock", "http", "https", "grpc", "grpcs"]


class GrpcEndpoint:
_scheme: str
_hostname: str
_port: int
_tls: bool
_authority: str
_url: str
_parsed_url: ParseResult # from urllib.parse
_endpoint: str

def __init__(self, url: str):
self._authority = URIParseConfig.DEFAULT_AUTHORITY
self._url = url

self._parsed_url = urlparse(self._preprocess_uri(url))
self._validate_path_and_query()

self._set_tls()
self._set_hostname()
self._set_scheme()
self._set_port()
self._set_endpoint()

def _set_scheme(self):
if len(self._parsed_url.scheme) == 0:
self._scheme = URIParseConfig.DEFAULT_SCHEME
return

if self._parsed_url.scheme in ["http", "https"]:
self._scheme = URIParseConfig.DEFAULT_SCHEME
warn("http and https schemes are deprecated, use grpc or grpcs instead")
return

if self._parsed_url.scheme not in URIParseConfig.ACCEPTED_SCHEMES:
raise ValueError(f"invalid scheme '{self._parsed_url.scheme}' in URL '{self._url}'")

self._scheme = self._parsed_url.scheme

@property
def scheme(self) -> str:
return self._scheme

def _set_hostname(self):
if self._parsed_url.hostname is None:
self._hostname = URIParseConfig.DEFAULT_HOSTNAME
return

if self._parsed_url.hostname.count(":") == 7:
# IPv6 address
self._hostname = f"[{self._parsed_url.hostname}]"
return

self._hostname = self._parsed_url.hostname

@property
def hostname(self) -> str:
return self._hostname

def _set_port(self):
if self._parsed_url.scheme in ["unix", "unix-abstract"]:
self._port = 0
return

if self._parsed_url.port is None:
self._port = URIParseConfig.DEFAULT_PORT
return

self._port = self._parsed_url.port

@property
def port(self) -> str:
if self._port == 0:
return ""

return str(self._port)

@property
def port_as_int(self) -> int:
return self._port

def _set_endpoint(self):
port = "" if not self._port else f":{self.port}"

if self._scheme == "unix":
separator = "://" if self._url.startswith("unix://") else ":"
self._endpoint = f"{self._scheme}{separator}{self._hostname}"
return

if self._scheme == "vsock":
self._endpoint = f"{self._scheme}:{self._hostname}:{self.port}"
return

if self._scheme == "unix-abstract":
self._endpoint = f"{self._scheme}:{self._hostname}{port}"
return

if self._scheme == "dns":
authority = f"//{self._authority}/" if self._authority else ""
self._endpoint = f"{self._scheme}:{authority}{self._hostname}{port}"
return

self._endpoint = f"{self._scheme}:{self._hostname}{port}"

@property
def endpoint(self) -> str:
return self._endpoint

# Prepares the uri string in a specific format for parsing by the urlparse function
def _preprocess_uri(self, url: str) -> str:
url_list = url.split(":")
if len(url_list) == 3 and "://" not in url:
# A URI like dns:mydomain:5000 or vsock:mycid:5000 was used
url = url.replace(":", "://", 1)
elif len(url_list) >= 2 and "://" not in url and url_list[
0] in URIParseConfig.ACCEPTED_SCHEMES:

# A URI like dns:mydomain or dns:[2001:db8:1f70::999:de8:7648:6e8]:mydomain was used
# Possibly a URI like dns:[2001:db8:1f70::999:de8:7648:6e8]:mydomain was used
url = url.replace(":", "://", 1)
else:
raise ValueError(f"Invalid address: {address}")
url_list = url.split("://")
if len(url_list) == 1:
# If a scheme was not explicitly specified in the URL
# we need to add a default scheme,
# because of how urlparse works
url = f'{URIParseConfig.DEFAULT_SCHEME}://{url}'
else:
# If a scheme was explicitly specified in the URL
# we need to make sure it is a valid scheme
scheme = url_list[0]
if scheme not in URIParseConfig.ACCEPTED_SCHEMES:
raise ValueError(f"invalid scheme '{scheme}' in URL '{url}'")

# We should do a special check if the scheme is dns, and it uses
# an authority in the format of dns:[//authority/]host[:port]
if scheme.lower() == "dns":
# A URI like dns://authority/mydomain was used
url_list = url.split("/")
if len(url_list) < 4:
raise ValueError(f"invalid dns authority '{url_list[2]}' in URL '{url}'")
self._authority = url_list[2]
url = f'dns://{url_list[3]}'
return url

def _set_tls(self):
query_dict = parse_qs(self._parsed_url.query)
tls_str = query_dict.get('tls', [""])[0]
tls = tls_str.lower() == 'true'
if self._parsed_url.scheme == "https":
tls = True

self._tls = tls

try:
port = int(port)
except ValueError:
raise ValueError(f"invalid port: {port}")
@property
def tls(self) -> bool:
return self._tls

return scheme, fqdn, port
def _validate_path_and_query(self) -> None:
if self._parsed_url.path:
raise ValueError(f"paths are not supported for gRPC endpoints:"
f" '{self._parsed_url.path}'")
if self._parsed_url.query:
query_dict = parse_qs(self._parsed_url.query)
if 'tls' in query_dict and self._parsed_url.scheme in ["http", "https"]:
raise ValueError(
f"the tls query parameter is not supported for http(s) endpoints: "
f"'{self._parsed_url.query}'")
query_dict.pop('tls', None)
if query_dict:
raise ValueError(f"query parameters are not supported for gRPC endpoints:"
f" '{self._parsed_url.query}'")
Loading

0 comments on commit 690ac99

Please sign in to comment.