diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index c182d02..587c45b 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -22,13 +22,16 @@ jobs: build-rust: name: Build (Rust) - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest strategy: matrix: arch: - amd64 - arm64 + features: + - sockets + - "" steps: - name: Set arch ${{ matrix.arch }} @@ -61,44 +64,76 @@ jobs: echo "profilestr=--profile $PROFILE" >> $GITHUB_ENV fi - uses: actions/checkout@v3 - - uses: Swatinem/rust-cache@v2 - with: - key: ${{ matrix.arch }}-${{ env.PROFILE }} - uses: actions-rs/toolchain@v1 with: toolchain: stable override: true target: ${{ env.rustarch }} + - uses: Swatinem/rust-cache@v2 + with: + key: ${{ matrix.arch }}-${{ env.PROFILE }} + prefix-key: v1-rust-${{ matrix.features && format('features_{0}', matrix.features) || 'nofeatures' }} # Increase to invalidate old caches. - name: Build (${{ matrix.arch }}) uses: actions-rs/cargo@v1 with: use-cross: ${{ env.is_cross }} command: build - args: --target ${{ env.rustarch }} ${{ env.profilestr }} + args: --target ${{ env.rustarch }} ${{ matrix.features && format('--features {0}', matrix.features) }} ${{ env.profilestr }} - name: Upload Artifact uses: actions/upload-artifact@v3 with: - name: binaries-${{ matrix.arch }} + name: binaries-${{ matrix.arch }}-${{ matrix.features }} path: | target/${{ env.rustarch }}/${{ env.PROFILE }}/connect test: name: Run tests needs: [ build-rust ] - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 + + strategy: + matrix: + features: + - "" + - "sockets" steps: - - name: Not implemented yet - run: echo "This will be implemented soonish" + - uses: actions/checkout@v3 + - uses: actions/download-artifact@v3 + with: + name: binaries-amd64-${{ matrix.features }} + path: artifacts/binaries-amd64/ + - name: Set diffrent image tag + run: | + if [[ ${{ format('"{0}"', matrix.features) }} == 'sockets' ]]; then + echo "TAG=develop-sockets" >> $GITHUB_ENV + fi + - name: Start containers + run: ./dev/start ci + # - name: Show logs + # working-directory: ./dev + # run: | + # sleep 3 + # docker compose logs + - name: Run tests + run: cargo test ${{ format('--features "{0}"', matrix.features) }} docker: needs: [ build-rust, pre-check, test ] + + strategy: + matrix: + features: + - "" + - "sockets" + # This workflow defines how a maven package is built, tested and published. # Visit: https://github.com/samply/github-workflows/blob/develop/.github/workflows/docker-ci.yml, for more information uses: samply/github-workflows/.github/workflows/docker-ci.yml@main with: # The Docker Hub Repository you want eventually push to, e.g samply/share-client image-name: "samply/beam-connect" + image-tag-suffix: ${{ matrix.features && format('-{0}', matrix.features) }} # Define special prefixes for docker tags. They will prefix each images tag. # image-tag-prefix: "foo" # Define the build context of your image, typically default '.' will be enough @@ -107,9 +142,8 @@ jobs: build-file: './Dockerfile.ci' # NOTE: This doesn't work currently # A list of build arguments, passed to the docker build -# build-args: | -# PROFILE=${{ env.PROFILE }} -# COMPONENT=broker + build-args: | + FEATURE=-${{ matrix.features }} # Define the target platforms of the docker build (default "linux/amd64,linux/arm64/v8") # build-platforms: "linux/amd64,linux/arm64" # If your actions generate an artifact in a previous build step, you can tell this workflow to download it diff --git a/Cargo.toml b/Cargo.toml index 36f17cc..7e59782 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "connect" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" @@ -18,30 +18,36 @@ inherits = "release" strip = false [dependencies] -shared = { git = "https://github.com/samply/beam", branch="develop" } +beam-lib = { git = "https://github.com/samply/beam", branch="develop", features = ["strict-ids"] } -#axum = "0.5.12" tokio = { version = "1", features = ["macros","rt-multi-thread","signal"] } -hyper = { version = "0", features = ["full"] } -tower-http = { version = "0", features = ["trace"] } -tower = "*" +hyper = { version = "0.14", features = ["full"] } # HTTP client with proxy support -hyper-tls = "0.5.0" -hyper-proxy = "0.9.1" -mz-http-proxy = { version = "0.1.0", features = ["hyper"] } +reqwest = { version = "0.11.19", features = ["json", "stream"] } -log = "*" -pretty_env_logger = "*" +tracing = "0.1" serde = "*" serde_json = "*" -hyper_serde = "0.13" -clap = {version = "4", features = ["derive"]} +clap = { version = "4", features = ["derive", "env"] } thiserror = "*" -http-serde = "1.1.2" +http-serde = "1.1" +tokio-native-tls = "0.3.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +anyhow = "1" +openssl = "*" # Already used by native_tls which does not reexport it. This is used for b64 en/decode + +[features] +sockets = ["beam-lib/sockets"] [build-dependencies] build-data = "0" + +[dev-dependencies] +once_cell = "1" +futures-util = "0.3.28" +paste = "1.0.12" +tokio-tungstenite = "0.20.0" diff --git a/Cross.toml b/Cross.toml index 8139e11..e253e5b 100644 --- a/Cross.toml +++ b/Cross.toml @@ -1,5 +1,5 @@ [target.aarch64-unknown-linux-gnu] -image = "ghcr.io/lablans/cross-test:aarch64-unknown-linux-gnu" +pre-build = ["dpkg --add-architecture arm64 && apt-get update && apt-get install --assume-yes libssl-dev:arm64 && rm -rf /var/lib/apt/lists/*"] [target.x86_64-unknown-linux-gnu] -image = "ghcr.io/lablans/cross-test:x86_64-unknown-linux-gnu" +pre-build = ["dpkg --add-architecture amd64 && apt-get update && apt-get install --assume-yes libssl-dev:amd64 && rm -rf /var/lib/apt/lists/*"] diff --git a/Dockerfile.ci b/Dockerfile.ci index 2dd4017..dd462af 100644 --- a/Dockerfile.ci +++ b/Dockerfile.ci @@ -3,11 +3,17 @@ ARG IMGNAME=gcr.io/distroless/cc FROM alpine AS chmodder +ARG FEATURE ARG TARGETARCH -COPY /artifacts/binaries-$TARGETARCH/connect /app/ +COPY /artifacts/binaries-$TARGETARCH$FEATURE/connect /app/ RUN chmod +x /app/* -FROM ${IMGNAME} +# FROM ${IMGNAME} +FROM ubuntu:latest +RUN apt update +RUN apt install -y ca-certificates +RUN apt install -y ssl-cert +RUN make-ssl-cert generate-default-snakeoil #ARG COMPONENT ARG TARGETARCH #COPY /artifacts/binaries-$TARGETARCH/$COMPONENT /usr/local/bin/ diff --git a/README.md b/README.md index 022a0a7..910c1f7 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ The following command line parameters are required: * `PROXY_URL`: The URL of the local Samply.Proxy which is used to connect to the Samply.Broker * `APP_ID`: The BeamId of the Beam.Connect application * `LOCAL_TARGETS_FILE`: The path to the local service resolution file (see [Routing Section](#Request-Routing)). - * `DISCOVERY_URL`: The URL that is queried to receive the central service discovery this may also be a local file (see [Routing Section](#Request-Routing)). + * `DISCOVERY_URL`: The URL (or local file) to be is queried to receive the central service discovery (see [Routing Section](#Request-Routing)). The following command line parameter is only used in Receiver mode (see [Usage Section](#usage)): * `PROXY_APIKEY`: In Receiver Mode, the API key with which this Beam.Connector is registered for listening at the Samply.Broker @@ -42,6 +42,9 @@ The following command line parameter is only used in Receiver mode (see [Usage S The following command line parameter is optional, as it uses a default value: * `BIND_ADDR`: The interface and port Beam.Connect is listening on. Defaults to `0.0.0.0:8062`. +If the following flag is optional. + * `NO_AUTH`: Samply.Beam.Connect does not require a `Proxy Authorization` header, i.e. it forwards requests without (client) authentication + All parameters can be given as environment variables instead. ### Run using Docker @@ -58,9 +61,10 @@ docker run -e PROXY_URL='' \ -e DISCOVERY_URL='' \ -e PROXY_APIKEY='' \ -e BIND_ADDR='' \ + -e NO_AUTH='true' \ samply/beam-connect ``` -Again, the last environment variable `PROXY_APIKEY` is only required for usage in Receiver Mode and `BIND_ADDR` is optional. +Again, the environment variable `PROXY_APIKEY` is only required for usage in Receiver Mode. `BIND_ADDR` and `NO_AUTH` are optional. ### Use Beam.Connect to forward a HTTP request We give an example [cURL](https://curl.se/) request showing the usage of Beam.Connect to access an internal service within University Hospital #23 (`uk23`): @@ -87,7 +91,14 @@ A mishap in communication will be returned as appropriate HTTP replies. As described in the [command line parameter list](#run-as-an-application), the central cite discovery is fetched from a given URL or local json file. However, to spare the local services from the need to express outward facing connections themselves, Samply.Beam.Connect exports this received information as a local REST endpoint: `GET http://:/sites`. Note, that the information is only fetched at startup and remains static for the program's lifetime. +#### HTTPS support + +Https is supported but requires setting up the following parameters: +* `SSL_CERT_PEM`: Location to the pem file used for incoming SSL connections. +* `SSL_CERT_KEY`: Location to the corresponding key file for the SSL connections. +* `TLS_CA_CERTIFICATES_DIR`: May need to be set if the local target uses a self signed certificate which is not trusted by beam-connect. In this case the certificate of the target must be placed inside `TLS_CA_CERTIFICATES_DIR` as a pem file in order to be trusted. + ## Notes -At the moment Samply.Beam.Connect does not implement streaming and does not support HTTPS connections. In the intended usage scenario, both Samply.Beam.Connect and Samply.Beam.Proxy are positioned right next to each other in the same privileged network and thus speak plain HTTP. Of course, for outgoing traffic, the Samply.Proxy signs and encrypts the payloads on its own. +At the moment Samply.Beam.Connect does not implement streaming. In the intended usage scenario, both Samply.Beam.Connect and Samply.Beam.Proxy are positioned right next to each other in the same privileged network and thus speak plain HTTP or [HTTPS if configured](#https). Of course, for outgoing traffic, the Samply.Proxy signs and encrypts the payloads on its own. In Receiving Mode, Beam.Connect only relays requests to allow-listed resources to mitigate possible misuse. diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index 524edfc..20323c8 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -1,7 +1,7 @@ version: "3.7" services: vault: - image: vault + image: hashicorp/vault ports: - 127.0.0.1:8200:8200 environment: @@ -21,7 +21,6 @@ services: BROKER_URL: ${BROKER_URL} PKI_ADDRESS: http://vault:8200 no_proxy: vault - NO_PROXY: vault PRIVKEY_FILE: /run/secrets/dummy.pem BIND_ADDR: 0.0.0.0:8080 RUST_LOG: ${RUST_LOG} @@ -37,12 +36,10 @@ services: environment: BROKER_URL: ${BROKER_URL} PROXY_ID: ${PROXY1_ID} - APP_0_ID: ${APP1_ID_SHORT} - APP_0_KEY: ${APP_KEY} + APP_app1_KEY: ${APP_KEY} PRIVKEY_FILE: /run/secrets/proxy1.pem BIND_ADDR: 0.0.0.0:8081 RUST_LOG: ${RUST_LOG} - NO_PROXY: broker no_proxy: broker secrets: - proxy1.pem @@ -53,7 +50,6 @@ services: build: context: ../ dockerfile: Dockerfile.ci - image: samply/beam-connect:${TAG} ports: - 8062:8062 volumes: @@ -64,19 +60,18 @@ services: PROXY_APIKEY: ${APP_KEY} DISCOVERY_URL: "./map/example_central_test.json" RUST_LOG: ${RUST_LOG} - NO_PROXY: proxy1 - no_proxy: proxy1 + no_proxy: proxy1,my.example.com connect2: depends_on: - proxy2 build: context: ../ dockerfile: Dockerfile.ci - image: samply/beam-connect:${TAG} ports: - 8063:8063 volumes: - ../examples/:/map + - ssl-cert:/custom-cert environment: PROXY_URL: "http://proxy2:8082" BIND_ADDR: 0.0.0.0:8063 @@ -85,8 +80,8 @@ services: DISCOVERY_URL: "./map/example_central_test.json" LOCAL_TARGETS_FILE: "./map/example_local_test.json" RUST_LOG: ${RUST_LOG} - NO_PROXY: proxy2 - no_proxy: proxy2 + no_proxy: proxy2,my.example.com + TLS_CA_CERTIFICATES_DIR: /custom-cert proxy2: depends_on: [broker] image: samply/beam-proxy:${TAG} @@ -95,16 +90,28 @@ services: environment: BROKER_URL: ${BROKER_URL} PROXY_ID: ${PROXY2_ID} - APP_0_ID: ${APP2_ID_SHORT} - APP_0_KEY: ${APP_KEY} + APP_app2_KEY: ${APP_KEY} PRIVKEY_FILE: /run/secrets/proxy2.pem BIND_ADDR: 0.0.0.0:8082 RUST_LOG: ${RUST_LOG} - NO_PROXY: broker no_proxy: broker secrets: - proxy2.pem - root.crt.pem + ws-echo: + image: jmalloc/echo-server + environment: + PORT: 80 + echo: + image: mendhak/http-https-echo + container_name: my.example.com # We set this so that we can connect via this common name so that the ssl cert CN matches + environment: + - HTTP_PORT=80 + - HTTPS_PORT=443 + volumes: + - ssl-cert:/app/custom-cert + entrypoint: ["sh", "-c", "cp ./fullchain.pem ./custom-cert/cert.pem && node ./index.js"] + user: "0:0" secrets: pki.secret: file: ./pki/pki.secret @@ -116,3 +123,6 @@ secrets: file: ./pki/dummy.priv.pem root.crt.pem: file: ./pki/root.crt.pem + +volumes: + ssl-cert: diff --git a/dev/start b/dev/start index 4e33223..d1507bf 100755 --- a/dev/start +++ b/dev/start @@ -53,17 +53,17 @@ function build() { CONNECT=./target/debug/connect if [ ! -x ./artifacts/binaries-$ARCH ]; then echo "Binaries missing -- building ..." - BUILD="$(cargo build --message-format=json)" + BUILD="$(cargo build $@ --message-format=json)" echo "Will rebuild docker image since binaries had not been there." mkdir -p artifacts/binaries-$ARCH rsync "$CONNECT" artifacts/binaries-$ARCH/ BUILD_DOCKER=1 - elif [ -z "$(docker images -q samply/beam-broker:$TAG)" ] || [ -z "$(docker images -q samply/beam-proxy:$TAG)" ]; then + elif [ -z "$(docker images -q samply/beam-connect:$TAG)" ]; then echo "Will rebuild docker image since it is missing." BUILD_DOCKER=1 elif [ -x ./target ]; then echo "Checking for changed Rust source code ..." - BUILD="$(cargo build --message-format=json)" + BUILD="$(cargo build $@ --message-format=json)" if echo $BUILD | jq 'select(.fresh==false)' | grep -q 'fresh'; then echo "Will rebuild docker image due to changes in rust binaries." rsync "$CONNECT" artifacts/binaries-$ARCH/ @@ -104,8 +104,46 @@ function start { clean pki/pki devsetup echo "$VAULT_TOKEN" > ./pki/pki.secret - build + build $@ docker-compose up --no-build --no-recreate --abort-on-container-exit } -start +function start_ci { + unset IMGNAME + clean + pki/pki devsetup + echo "$VAULT_TOKEN" > ./pki/pki.secret + build $@ + docker-compose up --no-build --no-recreate -d + for ADDR in $P1 $P2; do + TRIES=1 + while [ $TRIES -ne 0 ]; do + set +e + CODE=$(curl -s -o /tmp/body -w '%{response_code}' $ADDR/v1/health) + set -e + if [ "$CODE" == "200" ]; then + TRIES=0 + else + echo "Waiting for $ADDR ... (try ${TRIES}/30, last response was: code=$OUT, body=\"$(cat /tmp/body 2>/dev/null)\")" + sleep 1 + ((TRIES=TRIES+1)) + if [ $TRIES -ge 30 ]; then + echo "ERROR: $ADDR not available after 30 seconds. Giving up and printing docker compose logs." + docker-compose logs + exit 5 + fi + fi + done + done + echo "Services are up!" +} + +case "$1" in + ci) + shift + start_ci $@ + ;; + *) + start $@ + ;; +esac diff --git a/dev/test.py b/dev/test.py deleted file mode 100644 index eec5010..0000000 --- a/dev/test.py +++ /dev/null @@ -1,43 +0,0 @@ - -try: - import requests -except ImportError: - import os - os.system("python3 -m pip install requests") - import requests - -import unittest - -class TestConnect(unittest.TestCase): - - def test_normal_request(self): - res = request_connect("http://httpbin.org/anything") - self.assertEqual(res.status_code, 200, "Could not make normal request via connect") - - def test_json_body(self): - json = { - "foo": "bar", - "asdf": 3, - "baz": [{}, 2] - } - res = request_connect("http://httpbin.org/anything", json=json) - self.assertEqual(res.status_code, 200, "Could not make normal request via connect") - self.assertEqual(res.json().get("json"), json, "Json did not match") - - -def request_connect(url: str, json = {}, app_id: str = "app1.proxy1.broker", app_secret: str = "App1Secret", proxy_url: str = "http://localhost:8062") -> requests.Response: - proxies = { - "http": proxy_url - } - headers = { - "Proxy-Authorization": f"ApiKey {app_id} {app_secret}", - "Accept": "application/json" - } - return requests.get(url, json=json, proxies=proxies, headers=headers) - - -def main(): - unittest.main() - -if __name__ == "__main__": - main() diff --git a/examples/example_central_test.json b/examples/example_central_test.json index ba96edd..fd5f552 100644 --- a/examples/example_central_test.json +++ b/examples/example_central_test.json @@ -3,7 +3,31 @@ { "id": "C2", "name": "connect2", - "virtualhost": "httpbin.org", + "virtualhost": "echo-get", + "beamconnect": "app2.proxy2.broker" + }, + { + "id": "C2", + "name": "connect2", + "virtualhost": "echo-get:443", + "beamconnect": "app2.proxy2.broker" + }, + { + "id": "C2", + "name": "connect2", + "virtualhost": "echo-post", + "beamconnect": "app2.proxy2.broker" + }, + { + "id": "C2", + "name": "connect2", + "virtualhost": "echo-post:443", + "beamconnect": "app2.proxy2.broker" + }, + { + "id": "ws-echo", + "name": "ws-echo", + "virtualhost": "ws-echo", "beamconnect": "app2.proxy2.broker" } ] diff --git a/examples/example_local_targets.json b/examples/example_local_targets.json index f7be15c..1389bdb 100644 --- a/examples/example_local_targets.json +++ b/examples/example_local_targets.json @@ -1,5 +1,5 @@ [ {"external": "uk1.virtual","internal":"ifconfig.me","allowed":["connect.uk1.broker.example","connect.uk2.broker.example"]}, {"external": "uk2.virtual","internal":"ip-api.com","allowed":["connect.uk2.broker.example","connect.uk1.broker.example"]}, - {"external": "node23.uk12.network","internal":"host23.internal.network","allowed":["connect.uk2.broker.example","connect.uk12.broker.example"]} + {"external": "node23.uk12.network","internal":"host23.internal.network/node23","allowed":["connect.uk2.broker.example","connect.uk12.broker.example"]} ] diff --git a/examples/example_local_test.json b/examples/example_local_test.json index 5440f32..f9e8fac 100644 --- a/examples/example_local_test.json +++ b/examples/example_local_test.json @@ -1,7 +1,27 @@ [ { - "external": "httpbin.org", - "internal": "httpbin.org", + "external": "echo-get", + "internal": "my.example.com/get", + "allowed": ["app1.proxy1.broker"] + }, + { + "external": "echo-post", + "internal": "my.example.com/post", + "allowed": ["app1.proxy1.broker"] + }, + { + "external": "echo-get:443", + "internal": "my.example.com:443/get", + "allowed": ["app1.proxy1.broker"] + }, + { + "external": "echo-post:443", + "internal": "my.example.com:443/post", + "allowed": ["app1.proxy1.broker"] + }, + { + "external": "ws-echo", + "internal": "ws-echo:80", "allowed": ["app1.proxy1.broker"] } ] diff --git a/src/banner.rs b/src/banner.rs index c920c1d..9455a60 100644 --- a/src/banner.rs +++ b/src/banner.rs @@ -1,4 +1,4 @@ -use log::info; +use tracing::info; pub fn print_banner() { let commit = match env!("GIT_DIRTY") { diff --git a/src/config.rs b/src/config.rs index 8eb5ef9..ebb594a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,10 +1,12 @@ -use std::{error::Error, path::PathBuf, fs::File, str::FromStr}; +use std::{path::PathBuf, fs::{read_to_string, self}, str::FromStr, sync::Arc}; +use anyhow::Result; use clap::Parser; -use hyper::{Uri, http::uri::{Authority, Scheme}}; -use log::info; +use hyper::{Uri, http::uri::Authority}; +use reqwest::{Certificate, Client}; +use tokio_native_tls::{TlsAcceptor, native_tls::{self, Identity}}; use serde::{Serialize, Deserialize}; -use shared::{beam_id::{AppId, BeamId, app_to_broker_id, BrokerId}, http_client::{SamplyHttpClient, self}}; +use beam_lib::{AppId, set_broker_id}; use crate::{example_targets, errors::BeamConnectError}; @@ -20,7 +22,7 @@ impl FromStr for PathOrUri { fn from_str(s: &str) -> Result { match Uri::try_from(s) { Ok(uri) => Ok(Self::Uri(uri)), - Err(e_uri) => { + Err(..) => { let p = PathBuf::from(s); if p.is_file() { Ok(Self::Path(p)) @@ -39,7 +41,7 @@ struct CliArgs { #[clap(long, env, value_parser)] proxy_url: Uri, - /// Your short App ID (e.g. connect1) + /// Your App ID (e.g. connect1.proxy1.broker) #[clap(long, env, value_parser)] app_id: String, @@ -63,9 +65,22 @@ struct CliArgs { #[clap(long, env, value_parser)] tls_ca_certificates_dir: Option, + /// Pem file used for ssl support. Will use a snakeoil pem if unset. + #[clap(long, env, value_parser, default_value = "/etc/ssl/certs/ssl-cert-snakeoil.pem")] + ssl_cert_pem: PathBuf, + + /// Key file used for ssl support. Will use a snakeoil key if unset. + #[clap(long, env, value_parser, default_value = "/etc/ssl/private/ssl-cert-snakeoil.key")] + ssl_cert_key: PathBuf, + /// Expiry time of the request in seconds #[clap(long, env, value_parser, default_value = "3600")] expire: u64, + + /// If set will enable any local apps to authenticate without the `Proxy-Authorization` header. + /// Security note: This allows any app with network access to beam-connect to send requests to any other beam-connect service in the beam network. + #[clap(long, env, action)] + no_auth: bool, } #[derive(Serialize, Deserialize,Clone,Debug)] @@ -94,7 +109,7 @@ pub(crate) struct Site { pub(crate) beamconnect: AppId, } -#[derive(Clone,Deserialize,Debug)] +#[derive(Clone, Deserialize, Debug)] pub(crate) struct LocalMapping { pub(crate) entries: Vec } @@ -110,13 +125,45 @@ impl LocalMapping { } /// Maps an external authority to some internal authority if the requesting App is allowed to -#[derive(Clone,Deserialize,Debug)] +#[derive(Clone, Deserialize, Debug)] pub(crate) struct LocalMappingEntry { #[serde(with = "http_serde::authority", rename="external")] pub(crate) needle: Authority, // Host part of URL - #[serde(with = "http_serde::authority", rename="internal")] - pub(crate) replace: Authority, - pub(crate) allowed: Vec + #[serde(rename="internal")] + pub(crate) replace: AuthorityReplacement, + pub(crate) allowed: Vec, + #[serde(default, rename = "forceHttps")] + pub(crate) force_https: bool, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct AuthorityReplacement { + pub authority: Authority, + pub path: Option +} + +impl From for AuthorityReplacement { + fn from(authority: Authority) -> Self { + Self { authority, path: None } + } +} + +impl<'de> Deserialize<'de> for AuthorityReplacement { + fn deserialize(deserializer: D) -> Result + where D: serde::Deserializer<'de> + { + let string = String::deserialize(deserializer)?; + match string.split_once('/') { + Some((auth, path)) => Ok(Self { + authority: auth.parse().map_err(serde::de::Error::custom)?, + path: Some(path.to_owned()), + }), + None => Ok(Self { + authority: string.parse().map_err(serde::de::Error::custom)?, + path: None, + }) + } + } } #[derive(Clone)] @@ -129,10 +176,12 @@ pub(crate) struct Config { pub(crate) targets_local: LocalMapping, pub(crate) targets_public: CentralMapping, pub(crate) expire: u64, - pub(crate) client: SamplyHttpClient + pub(crate) client: Client, + pub(crate) tls_acceptor: Arc, + pub(crate) no_auth: bool, } -fn load_local_targets(broker_id: &BrokerId, local_target_path: &Option) -> Result> { +fn load_local_targets(broker_id: &str, local_target_path: &Option) -> Result { if let Some(json_file) = local_target_path { if json_file.exists() { let json_string = std::fs::read_to_string(json_file)?; @@ -142,66 +191,82 @@ fn load_local_targets(broker_id: &BrokerId, local_target_path: &Option) Ok(example_targets::example_local(broker_id)) } -async fn load_public_targets(client: &SamplyHttpClient, url: &PathOrUri) -> Result { - let bytes = match url { +async fn load_public_targets(client: &Client, url: &PathOrUri) -> Result { + match url { PathOrUri::Path(path) => { - std::fs::read_to_string(path).map_err(|e| BeamConnectError::ConfigurationError(format!("Failed to open central config file: {e}")))?.into() + serde_json::from_slice(&std::fs::read(path).map_err(|e| BeamConnectError::ConfigurationError(format!("Failed to open central config file: {e}")))?) }, PathOrUri::Uri(url) => { - let mut response = client.get(url - .to_string() - .try_into() - .map_err(|e| BeamConnectError::ConfigurationError(format!("Invalid url for public sites: {e}")))? - ).await - .map_err(|e| BeamConnectError::ConfigurationError(format!("Cannot retreive central service discovery configuration: {e}")) - )?; - - let body = response.body_mut(); - hyper::body::to_bytes(body).await.map_err(|e| BeamConnectError::ConfigurationError(format!("Invalid central site discovery response: {e}")))? + Ok(client.get(url.to_string()) + .send().await + .map_err(|e| BeamConnectError::ConfigurationError(format!("Cannot retrieve central service discovery configuration: {e}")))? + .json() + .await + .map_err(|e| BeamConnectError::ConfigurationError(format!("Invalid central site discovery response: {e}")))? + ) }, - }; - - let deserialized = serde_json::from_slice::(&bytes) - .map_err(|e| BeamConnectError::ConfigurationError(format!("Cannot parse central service discovery configuration: {e}")))?; + }.map_err(|e| BeamConnectError::ConfigurationError(format!("Cannot parse central service discovery configuration: {e}"))) +} - Ok(deserialized) +fn build_client(tls_cert_dir: Option<&PathBuf>) -> Result { + let mut client_builder = Client::builder(); + if let Some(tls_ca_dir) = tls_cert_dir { + for path_res in tls_ca_dir.read_dir()? { + if let Ok(path_buf) = path_res { + client_builder = client_builder.add_root_certificate(Certificate::from_pem(&fs::read(path_buf.path())?)?); + } + } + } + Ok(client_builder.build()?) } impl Config { - pub(crate) async fn load() -> Result> { + pub(crate) async fn load() -> Result { let args = CliArgs::parse(); - let broker_id = app_to_broker_id(&args.app_id)?; - AppId::set_broker_id(broker_id.clone()); - let my_app_id = AppId::new(&args.app_id)?; - let broker_id = BrokerId::new(&broker_id)?; - + let broker_id = args.app_id + .splitn(3, '.') + .last() + .ok_or_else(|| BeamConnectError::ConfigurationError(format!("Invalid beam id: {}", args.app_id)))?; + set_broker_id(broker_id.to_owned()); + let app_id = AppId::new(&args.app_id)?; + let expire = args.expire; - - let tls_ca_certificates = shared::crypto::load_certificates_from_dir(args.tls_ca_certificates_dir)?; - let client = http_client::build(&tls_ca_certificates, None, None)?; + let client = build_client(args.tls_ca_certificates_dir.as_ref())?; let targets_public = load_public_targets(&client, &args.discovery_url).await?; let targets_local = load_local_targets(&broker_id, &args.local_targets_file)?; + let identity = Identity::from_pkcs8( + read_to_string(args.ssl_cert_pem)?.as_bytes(), + read_to_string(args.ssl_cert_key)?.as_bytes(), + ).expect("Failed to initialize identity for tls acceptor"); + let tls_acceptor = Arc::new(native_tls::TlsAcceptor::new(identity) + .expect("Failed to initialize tls acceptor") + .into() + ); + Ok(Config { proxy_url: args.proxy_url, - my_app_id: my_app_id.clone(), - proxy_auth: format!("ApiKey {} {}", my_app_id, args.proxy_apikey), + my_app_id: app_id.clone(), + proxy_auth: format!("ApiKey {} {}", app_id, args.proxy_apikey), bind_addr: args.bind_addr, + no_auth: args.no_auth, targets_local, targets_public, expire, - client + client, + tls_acceptor }) } } #[cfg(test)] mod tests { + use beam_lib::set_broker_id; + use super::CentralMapping; use super::LocalMapping; use crate::example_targets::example_local; - use shared::beam_id::{BrokerId,BeamId,app_to_broker_id}; #[test] fn serde_authority() { @@ -233,6 +298,7 @@ mod tests { } ] }"#; + set_broker_id("broker.ccp-it.dktk.dkfz.de".to_owned()); let obj: CentralMapping = serde_json::from_str(serialized).unwrap(); assert_eq!(obj.sites.len(), 4); let mut routes = obj.sites.iter(); @@ -248,14 +314,13 @@ mod tests { #[test] fn local_target_configuration() { - let broker_id = app_to_broker_id("foo.bar.broker.example").unwrap(); - BrokerId::set_broker_id(broker_id.clone()); - let broker_id = BrokerId::new(&broker_id).unwrap(); + let broker_id = "broker.ccp-it.dktk.dkfz.de"; + set_broker_id(broker_id.to_owned()); let serialized = r#"[ - {"external": "ifconfig.me","internal":"ifconfig.me","allowed":["connect1.proxy23.broker.example","connect2.proxy23.broker.example"]}, - {"external": "ip-api.com","internal":"ip-api.com","allowed":["connect1.proxy23.broker.example","connect2.proxy23.broker.example"]}, - {"external": "wttr.in","internal":"wttr.in","allowed":["connect1.proxy23.broker.example","connect2.proxy23.broker.example"]}, - {"external": "node23.uk12.network","internal":"host23.internal.network","allowed":["connect1.proxy23.broker.example","connect2.proxy23.broker.example"]} + {"external": "ifconfig.me","internal":"ifconfig.me/asdf","allowed":["connect1.proxy23.broker.ccp-it.dktk.dkfz.de","connect2.proxy23.broker.ccp-it.dktk.dkfz.de"]}, + {"external": "ip-api.com","internal":"ip-api.com","allowed":["connect1.proxy23.broker.ccp-it.dktk.dkfz.de","connect2.proxy23.broker.ccp-it.dktk.dkfz.de"]}, + {"external": "wttr.in","internal":"wttr.in","allowed":["connect1.proxy23.broker.ccp-it.dktk.dkfz.de","connect2.proxy23.broker.ccp-it.dktk.dkfz.de"]}, + {"external": "node23.uk12.network","internal":"host23.internal.network","allowed":["connect1.proxy23.broker.ccp-it.dktk.dkfz.de","connect2.proxy23.broker.ccp-it.dktk.dkfz.de"]} ]"#; let obj: LocalMapping = LocalMapping{entries:serde_json::from_str(serialized).unwrap()}; let expect = example_local(&broker_id); diff --git a/src/errors.rs b/src/errors.rs index dad412c..e2569f0 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,15 +1,17 @@ -use std::{str::Utf8Error, string::FromUtf8Error}; +use std::string::FromUtf8Error; use hyper::Uri; -use shared::beam_id::AppOrProxyId; +use beam_lib::AppOrProxyId; use thiserror::Error; #[derive(Error,Debug)] pub(crate) enum BeamConnectError { + #[error("Regular proxy timeout")] + ProxyTimeoutError, #[error("Proxy rejected our authorization")] ProxyRejectedAuthorization, #[error("Unable to communicate with Proxy: {0}")] - ProxyHyperError(hyper::Error), + ProxyReqwestError(reqwest::Error), #[error("Unable to communicate with Proxy: {0}")] ProxyOtherError(String), #[error("Constructing HTTP request failed: {0}")] @@ -21,7 +23,7 @@ pub(crate) enum BeamConnectError { #[error("Unable to communicate with target host: {0}")] CommunicationWithTargetFailed(String), #[error("Unable to fetch reply from target host: {0}")] - FailedToReadTargetsReply(hyper::Error), + FailedToReadTargetsReply(reqwest::Error), #[error("Response was not valid UTF-8: {0}")] ResponseNotValidUtf8String(#[from] FromUtf8Error), #[error("Reply invalid: {0}")] diff --git a/src/example_targets.rs b/src/example_targets.rs index e609f4a..b6251a8 100644 --- a/src/example_targets.rs +++ b/src/example_targets.rs @@ -1,21 +1,22 @@ use hyper::http::uri::Authority; -use shared::beam_id::{AppId, BeamId, BrokerId, ProxyId}; +use beam_lib::{AppId, ProxyId}; -use crate::config::{CentralMapping, LocalMapping, LocalMappingEntry}; +use crate::config::{LocalMapping, LocalMappingEntry}; -pub(crate) fn example_local(broker_id: &BrokerId) -> LocalMapping { +pub(crate) fn example_local(broker_id: &str) -> LocalMapping { let proxy23 = ProxyId::new(&format!("proxy23.{}", broker_id)).unwrap(); let app1_id = AppId::new(&format!("connect1.{}",proxy23)).unwrap(); let app2_id = AppId::new(&format!("connect2.{}",proxy23)).unwrap(); let map = LocalMapping {entries: [ - ("ifconfig.me", "ifconfig.me", vec![app1_id.clone(), app2_id.clone()]), + ("ifconfig.me", "ifconfig.me/asdf", vec![app1_id.clone(), app2_id.clone()]), ("ip-api.com", "ip-api.com", vec![app1_id.clone(), app2_id.clone()]), ("wttr.in", "wttr.in", vec![app1_id.clone(), app2_id.clone()]), ("node23.uk12.network", "host23.internal.network", vec![app1_id, app2_id]) ].map(|(needle,replace,allowed)| LocalMappingEntry { needle: Authority::from_static(needle), - replace: Authority::from_static(replace), - allowed + replace: serde_json::from_value(serde_json::Value::String(replace.to_owned())).unwrap(), + allowed, + force_https: false }) .into_iter() .collect()}; diff --git a/src/logic_ask.rs b/src/logic_ask.rs index 5c14941..16bd50b 100644 --- a/src/logic_ask.rs +++ b/src/logic_ask.rs @@ -1,14 +1,13 @@ use std::{sync::Arc, str::FromStr}; -use std::time::{Duration, SystemTime}; -use hyper::{Request, Body, Client, client::HttpConnector, Response, header, StatusCode, body, Uri}; -use hyper_proxy::ProxyConnector; -use hyper_tls::HttpsConnector; -use log::{info, debug, warn, error}; +use hyper::http::HeaderValue; +use hyper::http::uri::{Authority, Scheme}; +use hyper::{Request, Body, Response, header, StatusCode, body, Uri}; +use tracing::{info, debug, warn, error}; use serde_json::Value; -use shared::http_client::SamplyHttpClient; -use shared::{beam_id::AppId, MsgTaskResult, MsgTaskRequest}; +use beam_lib::{AppId, TaskResult, TaskRequest, WorkStatus, FailureStrategy, MsgId}; -use crate::{config::Config, structs::MyStatusCode, msg::{HttpRequest, HttpResponse}, errors::BeamConnectError}; +use crate::config::CentralMapping; +use crate::{config::Config, structs::MyStatusCode, msg::{HttpRequest, HttpResponse}}; /// GET http://some.internal.system?a=b&c=d /// Host: @@ -16,56 +15,90 @@ use crate::{config::Config, structs::MyStatusCode, msg::{HttpRequest, HttpRespon pub(crate) async fn handler_http( mut req: Request, config: Arc, - client: SamplyHttpClient + https_authority: Option, ) -> Result, MyStatusCode> { + let targets = &config.targets_public; let method = req.method().to_owned(); let uri = req.uri().to_owned(); + + let host_header_auth = req.headers() + .get(header::HOST) + .and_then(|v| v.to_str().ok()) + .and_then(|v| dbg!(Authority::from_str(v)).ok()); + let authority = https_authority + .as_ref() + .or(uri.authority()); + if authority.is_none() && uri.path() == "/sites" { + // Case 1 for sites request: no authority set and /sites + return respond_with_sites(targets) + } + let authority = authority + .or(host_header_auth.as_ref()); + let Some(authority) = authority else { + return Err(StatusCode::BAD_REQUEST.into()) + }; let headers = req.headers_mut(); headers.insert(header::VIA, format!("Via: Samply.Beam.Connect/0.1 {}", config.my_app_id).parse().unwrap()); - let auth = - headers.remove(header::PROXY_AUTHORIZATION) - .ok_or(StatusCode::PROXY_AUTHENTICATION_REQUIRED)?; + let auth = if config.no_auth { + headers + .remove(header::PROXY_AUTHORIZATION) + .unwrap_or(config.proxy_auth.parse().expect("Proxy auth header could not be generated.")) + } else { + headers.remove(header::PROXY_AUTHORIZATION).ok_or(StatusCode::PROXY_AUTHENTICATION_REQUIRED)? + }; // Re-pack Authorization: Not necessary since we're not looking at the Authorization header. // if headers.remove(header::AUTHORIZATION).is_some() { // return Err(StatusCode::CONFLICT.into()); // } - // If the autority is empty (e.g. if localhost is used) or the authoroty is not in the routing - // table AND the path is /sites, return global routing table - if let Some(path) = uri.path_and_query() { - if (uri.authority().is_none() || targets.get(uri.authority().unwrap()).is_none()) && path == "/sites" { - debug!("Central Site Discovery requested"); - let body = body::Body::from(serde_json::to_string(targets)?); - let response = Response::builder() - .status(200) - .body(body) - .unwrap(); - return Ok(response); - - } - } - - let target = &targets.get(uri.authority().unwrap()) //TODO unwrap - .ok_or(StatusCode::UNAUTHORIZED)? - .beamconnect; + let Some(target) = &targets + .get(authority) + .map(|target| &target.beamconnect) else { + return if uri.path() == "/sites" { + // Case 2: target not in sites and /sites + respond_with_sites(targets) + } else { + warn!("Failed to lookup virtualhost in central mapping: {}", authority); + Err(StatusCode::UNAUTHORIZED.into()) + } + }; info!("{method} {uri} via {target}"); - let msg = http_req_to_struct(req, &config.my_app_id, &target, &config.expire).await?; + // Set the right authority as it might have been passed by the caller because it was a CONNECT request + *req.uri_mut() = { + let mut parts = req.uri().to_owned().into_parts(); + parts.authority = Some(authority.clone()); + if https_authority.is_some() { + parts.scheme = Some(Scheme::HTTPS); + } else { + parts.scheme = Some(Scheme::HTTP); + } + Uri::from_parts(parts).map_err(|e| { + warn!("Could not transform uri authority: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })? + }; + #[cfg(feature = "sockets")] + return crate::sockets::handle_via_sockets(req, &config, target, auth).await; + #[cfg(not(feature = "sockets"))] + return handle_via_tasks(req, &config, target, auth).await; +} + +async fn handle_via_tasks(req: Request, config: &Arc, target: &AppId, auth: HeaderValue) -> Result, MyStatusCode> { + let msg = http_req_to_struct(req, &config.my_app_id, &target, config.expire).await?; // Send to Proxy - let req_to_proxy = Request::builder() - .method("POST") - .uri(format!("{}v1/tasks", config.proxy_url)) + debug!("SENDING request to Proxy: {msg:?}"); + let resp = config.client.post(format!("{}v1/tasks", config.proxy_url)) .header(header::AUTHORIZATION, auth.clone()) - .body(body::Body::from(serde_json::to_vec(&msg)?)) - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - debug!("SENDING request to Proxy: {:?}, {:?}", msg, req_to_proxy); - let resp = client.request(req_to_proxy).await + .json(&msg) + .send() + .await .map_err(|_| StatusCode::BAD_GATEWAY)?; if resp.status() != StatusCode::CREATED { return Err(StatusCode::BAD_GATEWAY.into()); @@ -84,12 +117,12 @@ pub(crate) async fn handler_http( .path_and_query(format!("{}/results?wait_count=1&wait_timeout=10000", location.path())) .build().unwrap(); // TODO debug!("Fetching reply from Proxy: {results_uri}"); - let req = Request::builder() + let resp = config.client + .get(results_uri.to_string()) .header(header::AUTHORIZATION, auth) .header(header::ACCEPT, "application/json") - .uri(results_uri) - .body(body::Body::empty()).unwrap(); - let mut resp = client.request(req).await + .send() + .await .map_err(|e| { warn!("Got error from server: {e}"); StatusCode::BAD_GATEWAY @@ -110,9 +143,7 @@ pub(crate) async fn handler_http( } } - let bytes = body::to_bytes(resp.body_mut()).await - .map_err(|_| StatusCode::BAD_GATEWAY)?; - let mut task_results = serde_json::from_slice::>(&bytes) + let mut task_results = resp.json::>>().await .map_err(|e| { warn!("Unable to parse HTTP result: {}", e); StatusCode::BAD_GATEWAY @@ -125,14 +156,11 @@ pub(crate) async fn handler_http( } let result = task_results.pop().unwrap(); let response_inner = match result.status { - shared::WorkStatus::Succeeded => { - serde_json::from_str::(&result.body.body.ok_or({ - warn!("Recieved one sucessfull result but it has no body"); - StatusCode::BAD_GATEWAY - })?)? + WorkStatus::Succeeded => { + result.body }, e => { - warn!("Reply had unexpected workresult code: {}", e); + warn!("Reply had unexpected workresult code: {e:?}"); return Err(StatusCode::BAD_GATEWAY)?; } }; @@ -159,32 +187,43 @@ pub(crate) async fn handler_http( Ok(resp) } -async fn http_req_to_struct(req: Request, my_id: &AppId, target_id: &AppId, expire: &u64) -> Result { +async fn http_req_to_struct(req: Request, my_id: &AppId, target_id: &AppId, expire: u64) -> Result, MyStatusCode> { let method = req.method().clone(); let url = req.uri().clone(); let headers = req.headers().clone(); let body = body::to_bytes(req).await .map_err(|e| { - println!("{e}"); + warn!("Failed to read body: {e}"); StatusCode::BAD_REQUEST })?; - let body = String::from_utf8(body.to_vec())?; let http_req = HttpRequest { method, url, headers, - body, + body: body.to_vec(), + }; + let msg = TaskRequest { + from: my_id.clone().into(), + to: vec![target_id.clone().into()], + body: http_req, + failure_strategy: FailureStrategy::Discard, + metadata: Value::Null, + ttl: format!("{expire}s"), + id: MsgId::new() }; - let mut msg = MsgTaskRequest::new( - my_id.into(), - vec![target_id.into()], - serde_json::to_string(&http_req)?, - shared::FailureStrategy::Discard, - Value::Null - ); - - msg.expire = SystemTime::now() + Duration::from_secs(*expire); Ok(msg) } + +/// If the authority is empty (e.g. if localhost is used) or the authoroty is not in the routing +/// table AND the path is /sites, return global routing table +fn respond_with_sites(targets: &CentralMapping) -> Result, MyStatusCode> { + debug!("Central Site Discovery requested"); + let body = body::Body::from(serde_json::to_string(targets)?); + let response = Response::builder() + .status(200) + .body(body) + .unwrap(); + Ok(response) +} diff --git a/src/logic_reply.rs b/src/logic_reply.rs index 54b7222..ca18c15 100644 --- a/src/logic_reply.rs +++ b/src/logic_reply.rs @@ -1,18 +1,18 @@ -use hyper::{Client, client::HttpConnector, Request, header, StatusCode, body, Response, Body, Uri}; -use hyper_proxy::ProxyConnector; -use hyper_tls::HttpsConnector; -use log::{info, warn, debug}; +use beam_lib::{TaskRequest, TaskResult, WorkStatus, AppOrProxyId}; +use hyper::{header, StatusCode, body, Uri, Method, http::uri::PathAndQuery}; +use tracing::{info, warn, debug}; use serde_json::Value; -use shared::{MsgTaskRequest, MsgTaskResult, MsgId,beam_id::{BeamId,AppId}, WorkStatus, Plain, http_client::SamplyHttpClient}; +use reqwest::{Client, Response}; -use crate::{config::Config, errors::BeamConnectError, msg::{IsValidHttpTask, HttpResponse}}; +use crate::{config::Config, errors::BeamConnectError, msg::{HttpResponse, HttpRequest}}; -pub(crate) async fn process_requests(config: Config, client: SamplyHttpClient) -> Result<(), BeamConnectError> { +pub(crate) async fn process_requests(config: Config, client: Client) -> Result<(), BeamConnectError> { // Fetch tasks from Proxy let msgs = fetch_requests(&config, &client).await?; for task in msgs { - let resp = execute_http_task(&task, &config, &client).await?; + // If we fail to execute the http task we should report this as a failure to beam + let resp = execute_http_task(&task, &config, &client).await; send_reply(&task, &config, &client, resp).await?; } @@ -20,91 +20,123 @@ pub(crate) async fn process_requests(config: Config, client: SamplyHttpClient) - Ok(()) } -async fn send_reply(task: &MsgTaskRequest, config: &Config, client: &SamplyHttpClient, mut resp: Response) -> Result<(), BeamConnectError> { - let body = body::to_bytes(resp.body_mut()).await - .map_err(BeamConnectError::FailedToReadTargetsReply)?; - let http_reply = HttpResponse { - status: resp.status(), - headers: resp.headers().clone(), - body: String::from_utf8(body.to_vec())? +async fn send_reply(task: &TaskRequest, config: &Config, client: &Client, resp: Result) -> Result<(), BeamConnectError> { + let (reply_body, status) = match resp { + Ok(resp) => { + let status = resp.status(); + let headers = resp.headers().clone(); + if !status.is_success() { + warn!("Httptask returned with status {}. Reporting failure to broker.", resp.status()); + // warn!("Response body was: {}", &body); + }; + let body = resp.bytes().await + .map_err(BeamConnectError::FailedToReadTargetsReply)?; + (HttpResponse { + status, + headers, + body: body.to_vec() + }, WorkStatus::Succeeded) + }, + Err(e) => { + warn!("Failed to execute http task. Err: {e}"); + (HttpResponse { + body: b"Error executing http task. See beam connect logs".to_vec(), + status: StatusCode::INTERNAL_SERVER_ERROR, + headers: Default::default(), + }, WorkStatus::PermFailed) + }, }; - let http_reply = serde_json::to_string(&http_reply)?; - let msg = MsgTaskResult { + let msg = TaskResult { from: config.my_app_id.clone().into(), to: vec![task.from.clone()], task: task.id, - status: WorkStatus::Succeeded, + status, metadata: Value::Null, - body: Plain::from(http_reply), + body: reply_body, }; - let req_to_proxy = Request::builder() - .method("PUT") - .uri(format!("{}v1/tasks/{}/results/{}", config.proxy_url, task.id,config.my_app_id.clone())) + debug!("Delivering response to Proxy: {msg:?}"); + let resp = client + .put(format!("{}v1/tasks/{}/results/{}", config.proxy_url, task.id, config.my_app_id.clone())) .header(header::AUTHORIZATION, config.proxy_auth.clone()) - .body(body::Body::from(serde_json::to_vec(&msg)?)) - .map_err( BeamConnectError::HyperBuildError)?; - debug!("Delivering response to Proxy: {:?}, {:?}", msg, req_to_proxy); - let resp = client.request(req_to_proxy).await - .map_err(BeamConnectError::ProxyHyperError)?; + .json(&msg) + .send() + .await + .map_err(BeamConnectError::ProxyReqwestError)?; + if resp.status() != StatusCode::CREATED { return Err(BeamConnectError::ProxyOtherError(format!("Got error code {} trying to submit our result.", resp.status()))); } Ok(()) } -async fn execute_http_task(task: &MsgTaskRequest, config: &Config, client: &SamplyHttpClient) -> Result, BeamConnectError> { - let task_req = task.http_request()?; +// TODO: Take ownership of `task` to save clones +async fn execute_http_task(task: &TaskRequest, config: &Config, client: &Client) -> Result { + let task_req = &task.body; info!("{} | {} {}", task.from, task_req.method, task_req.url); - let target = config.targets_local.get(task_req.url.authority().unwrap()) //TODO unwrap - .ok_or(BeamConnectError::CommunicationWithTargetFailed(String::from("Target not defined")))?; - if !target.allowed.contains(&AppId::try_from(&task.from).or(Err(BeamConnectError::IdNotAuthorizedToAccessUrl(task.from.clone(), task_req.url.clone())))?) { - return Err(BeamConnectError::IdNotAuthorizedToAccessUrl(task.from.clone(), task_req.url.clone())); + let target = config + .targets_local + .get(task_req.url.authority().unwrap()) //TODO unwrap + .ok_or_else(|| { + warn!("Lookup of local target {} failed", task_req.url.authority().unwrap()); + BeamConnectError::CommunicationWithTargetFailed(String::from("Target not defined")) + })?; + match &task.from { + AppOrProxyId::App(app) if target.allowed.contains(app) => {}, + id => return Err(BeamConnectError::IdNotAuthorizedToAccessUrl(id.clone(), task_req.url.clone())), + }; + if task_req.method == Method::CONNECT { + debug!("Connect Request URL: {:?}", task_req.url); } - let uri = Uri::builder() - .scheme(task_req.url.scheme().unwrap().as_str()) - .authority(target.replace.to_owned()) - .path_and_query(task_req.url.path()) - .build().unwrap(); // TODO - - info!("Rewriten to: {} {}", task_req.method, uri); - let mut req = Request::builder() - .method(task_req.method) - .uri(uri); - *req.headers_mut().unwrap() = task_req.headers; - let body = body::Body::from(task_req.body); - let req = req.body(body)?; - debug!("Issuing request: {:?}", req); - let resp = client.request(req).await + let mut uri = Uri::builder(); + // Normal non CONNECT http request replacement + if let Some(scheme) = task_req.url.scheme_str() { + if target.force_https { + uri = uri.scheme(hyper::http::uri::Scheme::HTTPS); + } else { + uri = uri.scheme(scheme); + } + uri = if let Some(path) = target.replace.path { + uri.path_and_query(&format!("/{path}{}", task_req.url.path_and_query().unwrap_or(&PathAndQuery::from_static("")))) + } else { + uri.path_and_query(task_req.url.path_and_query().unwrap_or(&PathAndQuery::from_static("")).as_str()) + }; + } + let uri = uri + .authority(target.replace.authority.to_owned()) + .build()?; + + info!("Rewritten to: {} {}", task_req.method, uri); + let resp = client + .request(task_req.method.clone(), uri.to_string()) + .headers(task_req.headers.clone()) + .body(body::Body::from(task_req.body.clone())) + .send() + .await .map_err(|e| BeamConnectError::CommunicationWithTargetFailed(e.to_string()))?; Ok(resp) } -async fn fetch_requests(config: &Config, client: &SamplyHttpClient) -> Result, BeamConnectError> { - let req_to_proxy = Request::builder() - .uri(format!("{}v1/tasks?to={}&wait_count=1&filter=todo", config.proxy_url, config.my_app_id)) +async fn fetch_requests(config: &Config, client: &Client) -> Result>, BeamConnectError> { + info!("fetching requests from proxy"); + let resp = client + .get(format!("{}v1/tasks?to={}&wait_count=1&filter=todo", config.proxy_url, config.my_app_id)) .header(header::AUTHORIZATION, config.proxy_auth.clone()) .header(header::ACCEPT, "application/json") - .body(body::Body::empty())?; - info!("Requesting {req_to_proxy:?}"); - let mut resp = client.request(req_to_proxy).await - .map_err(BeamConnectError::ProxyHyperError)?; + .send() + .await + .map_err(BeamConnectError::ProxyReqwestError)?; match resp.status() { StatusCode::OK => { info!("Got request: {:?}", resp); }, + StatusCode::GATEWAY_TIMEOUT => return Err(BeamConnectError::ProxyTimeoutError), _ => { return Err(BeamConnectError::ProxyOtherError(format!("Got response code {}", resp.status()))); } } - let bytes = body::to_bytes(resp.body_mut()).await - .map_err(BeamConnectError::ProxyHyperError)?; - let msgs = serde_json::from_slice::>(&bytes); - if let Err(e) = msgs { - warn!("Unable to decode MsgTaskRequest; error: {e}. Content: {}", String::from_utf8_lossy(&bytes)); - return Err(e.into()); - } - let msgs = msgs.unwrap(); - debug!("Broker gave us {} tasks: {:?}", msgs.len(), msgs.first()); - Ok(msgs) + resp.json().await.map_err(|e| { + warn!("Unable to decode TaskRequest; error: {e}."); + BeamConnectError::ProxyOtherError(e.to_string()) + }) } diff --git a/src/main.rs b/src/main.rs index 9f925b2..14dc62a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,14 @@ -use std::{net::SocketAddr, str::FromStr, convert::Infallible, string::FromUtf8Error, error::Error, fmt::Display, collections::{hash_map, HashMap}, sync::Arc}; +use std::{net::SocketAddr, str::FromStr, convert::Infallible, error::Error, sync::Arc, time::Duration}; use config::Config; -use hyper::{body, Body, service::{service_fn, make_service_fn}, Request, Response, Server, header::{HeaderName, self, ToStrError}, Uri, http::uri::Authority, server::conn::AddrStream, Client, client::HttpConnector}; -use hyper_proxy::ProxyConnector; -use hyper_tls::HttpsConnector; -use log::{info, error, debug, warn}; -use shared::http_client::SamplyHttpClient; +use hyper::{body, Body, service::{service_fn, make_service_fn}, Request, Response, Server, server::conn::{AddrStream, Http}, Method}; +use logic_ask::handler_http; +use tracing::{info, debug, warn}; +use tracing_subscriber::{EnvFilter, filter::LevelFilter}; +use crate::errors::BeamConnectError; + +mod shutdown; mod msg; mod example_targets; mod config; @@ -15,10 +17,12 @@ mod structs; mod logic_ask; mod logic_reply; mod banner; +#[cfg(feature = "sockets")] +mod sockets; #[tokio::main] async fn main() -> Result<(), Box>{ - pretty_env_logger::init(); + tracing::subscriber::set_global_default(tracing_subscriber::fmt().with_env_filter(EnvFilter::builder().with_default_directive(LevelFilter::INFO.into()).from_env_lossy()).finish())?; banner::print_banner(); let config = Config::load().await?; let config2 = config.clone(); @@ -35,52 +39,84 @@ async fn main() -> Result<(), Box>{ loop { debug!("Waiting for next request ..."); if let Err(e) = logic_reply::process_requests(config2.clone(), client2.clone()).await { - warn!("Error in processing request: {e}. Will continue with the next one."); + match e { + BeamConnectError::ProxyTimeoutError => { + debug!("{e}"); + }, + BeamConnectError::ProxyReqwestError(e) => { + warn!("Error reaching beam proxy: {e}"); + tokio::time::sleep(Duration::from_secs(10)).await; + } + _ => { + warn!("Error in processing request: {e}. Will continue with the next one."); + } + } } } }); + #[cfg(feature = "sockets")] + sockets::spawn_socket_task_poller(config.clone()); let config = Arc::new(config.clone()); let make_service = make_service_fn(|_conn: &AddrStream| { // let remote_addr = conn.remote_addr(); - let client = client.clone(); let config = config.clone(); async { Ok::<_, Infallible>(service_fn(move |req| - handler_http_wrapper(req, config.clone(), client.clone()))) + handler_http_wrapper(req, config.clone()))) } }); let server = Server::bind(&listen) .serve(make_service) - .with_graceful_shutdown(shared::graceful_shutdown::wait_for_signal()); + .with_graceful_shutdown(crate::shutdown::wait_for_signal()); if let Err(e) = server.await { eprintln!("server error: {}", e); } info!("(2/2) Shutting down gracefully ..."); http_executor.abort(); - http_executor.await.unwrap(); Ok(()) } -async fn handler_http_wrapper( +pub(crate) async fn handler_http_wrapper( req: Request, config: Arc, - client: SamplyHttpClient ) -> Result, Infallible> { - match logic_ask::handler_http(req, config, client).await { - Ok(e) => Ok(e), - Err(e) => Ok(Response::builder().status(e.code).body(body::Body::empty()).unwrap()), + // On https connections we want to emulate that we successfully connected to get the actual http request + if req.method() == Method::CONNECT { + tokio::spawn(async move { + let authority = req.uri().authority().cloned(); + match hyper::upgrade::on(req).await { + Ok(connection) => { + let tls_connection = match config.tls_acceptor.accept(connection).await { + Err(e) => { + warn!("Error accepting tls connection: {e}"); + return; + }, + Ok(s) => s, + }; + Http::new().serve_connection(tls_connection, service_fn(|req| { + let config = config.clone(); + let authority = authority.clone(); + async move { + match handler_http(req, config, authority).await { + Ok(e) => Ok::<_, Infallible>(e), + Err(e) => Ok(Response::builder().status(e.code).body(body::Body::empty()).unwrap()), + } + } + })).await.unwrap_or_else(|e| warn!("Failed to handle upgraded connection: {e}")); + }, + Err(e) => warn!("Failed to upgrade connection: {e}"), + }; + }); + Ok(Response::new(Body::empty())) + } else { + match handler_http(req, config, None).await { + Ok(e) => Ok(e), + Err(e) => Ok(Response::builder().status(e.code).body(body::Body::empty()).unwrap()), + } } -} -async fn shutdown_signal() { - // Wait for the CTRL+C signal - info!("Starting ..."); - tokio::signal::ctrl_c() - .await - .expect("failed to install CTRL+C signal handler"); - info!("(1/2) Shutting down gracefully ..."); } diff --git a/src/msg.rs b/src/msg.rs index a04b937..87117c1 100644 --- a/src/msg.rs +++ b/src/msg.rs @@ -1,39 +1,43 @@ use hyper::{Method, Uri, HeaderMap, StatusCode}; use serde::{Serialize, Deserialize}; -use shared::MsgTaskRequest; -use crate::errors::BeamConnectError; - -#[derive(Serialize,Deserialize)] +#[derive(Serialize,Deserialize, Debug)] pub(crate) struct HttpRequest { - #[serde(with = "hyper_serde")] + #[serde(with = "http_serde::method")] pub(crate) method: Method, - #[serde(with = "hyper_serde")] + #[serde(with = "http_serde::uri")] pub(crate) url: Uri, - #[serde(with = "hyper_serde")] + #[serde(with = "http_serde::header_map")] pub(crate) headers: HeaderMap, - pub(crate) body: String + #[serde(with = "serde_base64")] + pub(crate) body: Vec } -#[derive(Serialize,Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub(crate) struct HttpResponse { - #[serde(with = "hyper_serde")] + #[serde(with = "http_serde::status_code")] pub(crate) status: StatusCode, - #[serde(with = "hyper_serde")] + #[serde(with = "http_serde::header_map")] pub(crate) headers: HeaderMap, - pub(crate) body: String + #[serde(with = "serde_base64")] + pub(crate) body: Vec } -pub(crate) trait IsValidHttpTask { - fn http_request(&self) -> Result; -} -impl IsValidHttpTask for MsgTaskRequest { - fn http_request(&self) -> Result { - let req_struct: HttpRequest = serde_json::from_str(self.body.body.as_ref().ok_or(BeamConnectError::ReplyInvalid("MsgTaskRequest had no content.".to_string()))?)?; - if false { // TODO - return Err(BeamConnectError::IdNotAuthorizedToAccessUrl(self.from.clone(), req_struct.url)); - } - Ok(req_struct) +// https://github.com/serde-rs/json/issues/360#issuecomment-330095360 +pub mod serde_base64 { + use serde::{Serializer, de, Deserialize, Deserializer}; + use openssl::base64; + + pub fn serialize(bytes: &[u8], serializer: S) -> Result + where S: Serializer + { + serializer.serialize_str(&base64::encode_block(bytes)) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> + where D: Deserializer<'de> + { + base64::decode_block(<&str>::deserialize(deserializer)?).map_err(de::Error::custom) } } diff --git a/src/shutdown.rs b/src/shutdown.rs new file mode 100644 index 0000000..d9780be --- /dev/null +++ b/src/shutdown.rs @@ -0,0 +1,25 @@ +use tracing::info; + +#[cfg(unix)] +pub async fn wait_for_signal() { + use tokio::signal::unix::{signal, SignalKind}; + let mut sigterm = signal(SignalKind::terminate()) + .expect("Unable to register shutdown handler; are you running a Unix-based OS?"); + let mut sigint = signal(SignalKind::interrupt()) + .expect("Unable to register shutdown handler; are you running a Unix-based OS?"); + let signal = tokio::select! { + _ = sigterm.recv() => "SIGTERM", + _ = sigint.recv() => "SIGINT" + }; + // The following does not print in docker-compose setups but it does when run individually. + // Probably a docker-compose error. + info!("Received signal ({signal}) - shutting down gracefully."); +} + +#[cfg(windows)] +pub async fn wait_for_signal() { + if let Err(e) = tokio::signal::ctrl_c().await { + panic!("Unable to register shutdown handler: {e}."); + } + info!("Received shutdown signal - shutting down gracefully."); +} diff --git a/src/sockets.rs b/src/sockets.rs new file mode 100644 index 0000000..d331834 --- /dev/null +++ b/src/sockets.rs @@ -0,0 +1,267 @@ +use std::{time::Duration, collections::HashSet, sync::Arc, convert::Infallible}; + +use hyper::{header, Request, Body, StatusCode, upgrade::OnUpgrade, http::{HeaderValue, uri::PathAndQuery}, client::conn::Builder, server::conn::Http, service::service_fn, Uri}; +use tokio::{io::AsyncWriteExt, net::TcpStream}; +use tracing::{error, warn, debug, info}; +use beam_lib::{SocketTask, MsgId, AppId, AppOrProxyId}; +use reqwest::Response; + +use crate::{config::Config, errors::BeamConnectError, structs::MyStatusCode}; + + +pub(crate) fn spawn_socket_task_poller(config: Config) { + tokio::spawn(async move { + use BeamConnectError::*; + let mut seen: HashSet = HashSet::new(); + + loop { + let tasks = match poll_socket_task(&config).await { + Ok(tasks) => tasks, + Err(HyperBuildError(e)) => { + error!("{e}"); + error!("This is most likely caused by wrong configuration"); + break; + }, + Err(ProxyTimeoutError) => continue, + Err(e) => { + warn!("Error during socket task polling: {e}"); + tokio::time::sleep(Duration::from_secs(10)).await; + continue; + } + }; + for task in tasks { + if seen.contains(&task.id) { + continue; + } + seen.insert(task.id.clone()); + let AppOrProxyId::App(client) = task.from else { + warn!("Invalid app id skipping"); + continue; + }; + let config_clone = config.clone(); + tokio::spawn(async move { + match connect_proxy(&task.id, &config_clone).await { + Ok(resp) => tunnel(resp, client, &config_clone).await, + Err(e) => { + warn!("{e}"); + }, + }; + }); + } + } + }); +} + +async fn poll_socket_task(config: &Config) -> Result, BeamConnectError> { + let resp = config.client + .get(format!("{}v1/sockets", config.proxy_url)) + .header(header::AUTHORIZATION, config.proxy_auth.clone()) + .header(header::ACCEPT, "application/json") + .send() + .await + .map_err(BeamConnectError::ProxyReqwestError)?; + match resp.status() { + StatusCode::OK => {}, + StatusCode::GATEWAY_TIMEOUT => return Err(BeamConnectError::ProxyTimeoutError), + e => return Err(BeamConnectError::ProxyOtherError(format!("Unexpected status code {e}"))) + }; + resp.json().await.map_err(BeamConnectError::ProxyReqwestError) +} + +async fn connect_proxy(task_id: &MsgId, config: &Config) -> Result { + let resp = config.client + .get(format!("{}v1/sockets/{task_id}", config.proxy_url)) + .header(header::AUTHORIZATION, config.proxy_auth.clone()) + .header(header::UPGRADE, "tcp") + .send() + .await + .map_err(BeamConnectError::ProxyReqwestError)?; + let invalid_status_reason = match resp.status() { + StatusCode::SWITCHING_PROTOCOLS => return Ok(resp), + StatusCode::NOT_FOUND | StatusCode::GONE => { + "Task already expired".to_string() + }, + StatusCode::UNAUTHORIZED => { + "This socket is not for this authorized for this app".to_string() + } + other => other.to_string() + }; + Err(BeamConnectError::ProxyOtherError(invalid_status_reason)) +} + +fn status_to_response(status: StatusCode) -> hyper::Response { + let mut res = hyper::Response::default(); + *res.status_mut() = status; + res +} + +async fn tunnel(proxy: Response, client: AppId, config: &Config) { + let proxy = match proxy.upgrade().await { + Ok(socket) => socket, + Err(e) => { + warn!("Failed to upgrade connection to proxy: {e}"); + return; + }, + }; + let http_err = Http::new() + .http1_only(true) + .http1_keep_alive(true) + .serve_connection(proxy, service_fn(move |req| { + let client2 = client.clone(); + let config2 = config.clone(); + async move { + Ok::<_, Infallible>(execute_http_task(req, &client2, &config2).await.unwrap_or_else(status_to_response)) + } + })) + .with_upgrades() + .await; + + if let Err(e) = http_err { + warn!("Error while serving HTTP connection: {e}"); + } +} + +async fn execute_http_task(mut req: Request, app: &AppId, config: &Config) -> Result, StatusCode> { + let authority = req.uri().authority().expect("Authority is always set by the requesting beam-connect"); + let Some(target) = config.targets_local.get(authority) else { + warn!("Failed to lookup authority {authority}"); + return Err(StatusCode::BAD_REQUEST); + }; + if !target.allowed.contains(&app) { + warn!("App {app} not authorized to access url {}", req.uri()); + return Err(StatusCode::UNAUTHORIZED); + }; + *req.uri_mut() = { + let mut parts = req.uri().to_owned().into_parts(); + if target.force_https { + parts.scheme = Some(hyper::http::uri::Scheme::HTTPS) + } + parts.authority = Some(target.replace.authority.clone()); + if let Some(path) = target.replace.path { + parts.path_and_query = Some(PathAndQuery::try_from(&format!("/{path}{}", parts.path_and_query.as_ref().map(PathAndQuery::as_str).unwrap_or(""))).map_err(|e| { + warn!("Failed to set redirect path: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })?); + } + Uri::from_parts(parts).map_err(|e| { + warn!("Could not transform uri authority: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })? + }; + info!("Requesting {} {}", req.method(), req.uri()); + let req_upgrade = if req.headers().contains_key(header::UPGRADE) { + req.extensions_mut().remove::() + } else { + None + }; + + let mut resp = config.client + .execute(req.try_into().expect("This should always convert")) + .await + .map_err(|e| { + warn!("Error executuing http task. Failed handshake with server: {e}"); + StatusCode::BAD_GATEWAY + })?; + + if req_upgrade.is_some() { + tunnel_upgrade(resp.extensions_mut().remove::(), req_upgrade); + } + Ok(convert_to_hyper_response(resp)) +} + +// TODO: Make a PR to add into_parts for reqwest::Response or even a conversion trait impl to avoid clones +fn convert_to_hyper_response(resp: Response) -> hyper::Response { + let mut builder = hyper::http::response::Builder::new() + .status(resp.status()) + .version(resp.version()); + builder.headers_mut().map(|headers| *headers = resp.headers().clone()); + builder.body(hyper::Body::wrap_stream(resp.bytes_stream())).expect("This should always convert") +} + +fn tunnel_upgrade(client: Option, server: Option) { + if let (Some(client), Some(proxy)) = (client, server) { + tokio::spawn(async move { + let (mut client, mut proxy) = match tokio::try_join!(client, proxy) { + Err(e) => { + warn!("Upgrading connection between client and beam-connect failed: {e}"); + return; + }, + Ok(sockets) => sockets + }; + let result = tokio::io::copy_bidirectional(&mut client, &mut proxy).await; + if let Err(e) = result { + debug!("Relaying socket connection ended: {e}"); + } + }); + } +} + +pub(crate) async fn handle_via_sockets(mut req: Request, config: &Arc, target: &AppId, auth: HeaderValue) -> Result, MyStatusCode> { + let resp = config.client + .post(format!("{}v1/sockets/{target}", config.proxy_url)) + .header(header::AUTHORIZATION, auth) + .header(header::UPGRADE, "tcp") + .send() + .await + .map_err(|e| { + warn!("Failed to reach proxy: {e}"); + StatusCode::BAD_GATEWAY + } + )?; + if resp.status() != StatusCode::SWITCHING_PROTOCOLS { + return Err(resp.status().into()); + } + let proxy_socket = resp.upgrade().await.map_err(|e| { + warn!("Failed to upgrade response from proxy to socket: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + let (mut sender, proxy_conn) = Builder::new() + .http1_preserve_header_case(true) + .http1_title_case_headers(true) + .handshake(proxy_socket) + .await + .map_err(|e| { + warn!("Error doing handshake with proxy: {e}"); + StatusCode::BAD_GATEWAY + })?; + let req_upgrade = if req.headers().contains_key(header::UPGRADE) { + req.extensions_mut().remove::() + } else { + None + }; + let resp_future = sender.send_request(req); + let resp = if let Some(upgrade) = req_upgrade { + let (resp, proxy_connection) = tokio::join!(resp_future, proxy_conn.without_shutdown()); + match proxy_connection { + Ok(mut proxy_io) => { + tokio::spawn(async move { + let Ok(mut client) = upgrade.await else { + warn!("Failed to upgrade client connection"); + return; + }; + if !proxy_io.read_buf.is_empty() { + if let Err(e) = client.write_all_buf(&mut proxy_io.read_buf).await { + warn!("Failed to send initial bytes from remote to client: {e}"); + } + } + if let Err(e) = tokio::io::copy_bidirectional(&mut client, &mut proxy_io.io).await { + debug!("Error relaying connection from client to proxy: {e}"); + } + }); + }, + Err(e) => { + warn!("Connection failed: {e}"); + }, + }; + resp + } else { + tokio::spawn(proxy_conn); + resp_future.await + }; + let resp = resp.map_err(|e| { + warn!("Failed to send request to proxy: {e}"); + StatusCode::BAD_GATEWAY + })?; + Ok(resp) +} diff --git a/src/structs.rs b/src/structs.rs index 414e1b9..21bc797 100644 --- a/src/structs.rs +++ b/src/structs.rs @@ -1,8 +1,6 @@ use std::{string::FromUtf8Error, error::Error, fmt::Display}; -use hyper::{StatusCode, header::ToStrError, http::uri::Authority}; -use log::error; -use shared::errors::SamplyBeamError; +use hyper::{StatusCode, header::ToStrError}; #[derive(Debug)] pub(crate) struct MyStatusCode { @@ -27,19 +25,6 @@ impl From for StatusCode { } } -impl From for MyStatusCode { - fn from(e: SamplyBeamError) -> Self { - let code = match e { - SamplyBeamError::InvalidBeamId(e) => { - error!("{e}"); - StatusCode::BAD_REQUEST - }, - _ => StatusCode::NOT_IMPLEMENTED, - }; - Self { code } - } -} - impl From for MyStatusCode { fn from(_: FromUtf8Error) -> Self { Self { code: StatusCode::UNPROCESSABLE_ENTITY } diff --git a/tests/base_tests.rs b/tests/base_tests.rs new file mode 100644 index 0000000..f1243e4 --- /dev/null +++ b/tests/base_tests.rs @@ -0,0 +1,84 @@ +use hyper::{StatusCode, header}; +use reqwest::Client; +use serde_json::{Value, json}; + +mod common; +use common::TEST_CLIENT; + +pub async fn test_normal(scheme: &str) { + let res = TEST_CLIENT.get(format!("{scheme}://echo-get?foo1=bar1&foo2=bar2")).send().await.unwrap(); + assert_eq!(res.status(), StatusCode::OK, "Could not make normal request via beam-connect"); + let received: Value = res.json().await.unwrap(); + assert_eq!(received.get("query").unwrap(), &json!({ + "foo1": "bar1", + "foo2": "bar2" + }), "Json did not match"); + assert_eq!(received.get("path"), Some(&json!("/get/"))) +} + +pub async fn test_json(scheme: &str) { + let json = serde_json::json!({ + "foo": [1, 2, {}], + "bar": "foo", + "foobar": false, + }); + let res = TEST_CLIENT.post(format!("{scheme}://echo-post")).json(&json).send().await.unwrap(); + assert_eq!(res.status(), StatusCode::OK, "Could not make json request via beam-connect"); + let received: Value = res.json().await.unwrap(); + assert_eq!(received.get("body").and_then(Value::as_str).and_then(|s| serde_json::from_str::(s).ok()).unwrap(), json, "Json did not match"); + assert_eq!(received.get("path"), Some(&json!("/post/"))) +} + +#[tokio::test] +pub async fn test_empty_authority() { // Test only works with http, so no macro usage + let client = Client::builder() + .danger_accept_invalid_certs(true) + .build() + .unwrap(); + let res = client.get("http://localhost:8062") + .query(&[("foo1","bar1"),("foo2","bar2")]) + .header(header::HOST, "echo-get") + .header(header::PROXY_AUTHORIZATION, "ApiKey app1.proxy1.broker App1Secret") + .send().await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK, "Could not make normal request via beam-connect"); + let received: Value = res.json().await.unwrap(); + assert_eq!(received.get("query").unwrap(), &json!({ + "foo1": "bar1", + "foo2": "bar2" + }), "Json did not match"); + assert_eq!(received.get("path"), Some(&json!("/get/"))) +} + + + +test_http_and_https!{test_normal} +test_http_and_https!{test_json} + +#[cfg(feature = "sockets")] +#[cfg(test)] +mod socket_tests { + use futures_util::{SinkExt, StreamExt}; + use hyper::{header, StatusCode}; + use tokio_tungstenite::{tungstenite::{protocol::Role, Message}, WebSocketStream}; + + use crate::common::TEST_CLIENT; + + #[tokio::test] + pub async fn test_ws() { + let resp = TEST_CLIENT.get(format!("http://ws-echo")) + .header(header::UPGRADE, "websocket") + .header(header::CONNECTION, "upgrade") + .header(header::SEC_WEBSOCKET_VERSION, "13") + .header(header::SEC_WEBSOCKET_KEY, "h/QU7Qscq6DfSTu9aP78HQ==") + .send().await.unwrap(); + assert_eq!(resp.status(), StatusCode::SWITCHING_PROTOCOLS); + let socket = resp.upgrade().await.unwrap(); + let mut stream = WebSocketStream::from_raw_socket(socket, Role::Client, None).await; + let _server_hello = stream.next().await.unwrap().unwrap(); + stream.send(Message::Text("Hello World".to_string())).await.unwrap(); + let res = stream.next().await.unwrap().unwrap(); + assert_eq!(res, Message::Text("Hello World".to_string())); + stream.close(None).await.unwrap(); + } +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs new file mode 100644 index 0000000..8b13e1c --- /dev/null +++ b/tests/common/mod.rs @@ -0,0 +1,34 @@ + +use once_cell::sync::Lazy; +use hyper::{header, http::HeaderValue, HeaderMap}; +use reqwest::{Client, Proxy}; + +pub static TEST_CLIENT: Lazy = Lazy::new(|| { + Client::builder() + .danger_accept_invalid_certs(true) + .proxy(Proxy::all("http://localhost:8062").unwrap()) + .default_headers({ + let mut headers = HeaderMap::new(); + headers.append(header::PROXY_AUTHORIZATION, HeaderValue::from_static("ApiKey app1.proxy1.broker App1Secret")); + headers + }) + .build() + .unwrap() +}); + +#[macro_export] +macro_rules! test_http_and_https { + ($name:ident) => { + paste::paste! { + #[tokio::test] + async fn [<$name _http>]() { + $name("http").await + } + + #[tokio::test] + async fn [<$name _https>]() { + $name("https").await + } + } + }; +}