Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(elixir) use cbor on tcp transport serialization, introduce support for padding on secure channel #8226

Merged
merged 2 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ defmodule Ockam.SecureChannel.Channel do
defp continue_handshake({:continue, key_exchange_state}, state) do
with {:ok, data, next} <- XX.out_payload(key_exchange_state) do
msg = %{
payload: :bare.encode(data, :data),
payload: data,
onward_route: state.peer_route,
return_route: [state.inner_address]
}
Expand All @@ -531,55 +531,54 @@ defmodule Ockam.SecureChannel.Channel do
end

defp handle_inner_message_impl(message, %Channel{channel_state: %Handshaking{xx: xx}} = state) do
with {:ok, data} <- bare_decode_strict(message.payload, :data),
{:ok, next} <- XX.in_payload(xx, data) do
with {:ok, next} <- XX.in_payload(xx, message.payload) do
continue_handshake(next, %Channel{state | peer_route: message.return_route})
end
end

defp handle_inner_message_impl(message, %Channel{channel_state: channel_state} = state) do
with {:ok, ciphertext} <- bare_decode_strict(message.payload, :data),
{:ok, plaintext, decrypt_st} <-
Decryptor.decrypt("", ciphertext, channel_state.decrypt_st) do
case Messages.decode(plaintext) do
{:ok, %Messages.Payload{} = payload} ->
message = struct(Ockam.Message, Map.from_struct(payload))

handle_decrypted_message(message, %Channel{
state
| channel_state: %{channel_state | decrypt_st: decrypt_st}
})

{:ok, :close} ->
Logger.debug("Peer closed secure channel, terminating #{inspect(state.address)}")
{:stop, :normal, channel_state}

## TODO: add tests
{:ok, %Messages.RefreshCredentials{contact: contact, credentials: credentials}} ->
with {:ok, peer_identity, peer_identity_id} <- Identity.validate_contact_data(contact),
true <- peer_identity_id == channel_state.peer_identity_id,
:ok <- process_credentials(credentials, peer_identity_id, state.authorities) do
{:ok,
%Channel{
state
| channel_state: %{
channel_state
| peer_identity: peer_identity,
decrypt_st: decrypt_st
}
}}
else
error ->
Logger.warning("Invalid credential refresh: #{inspect(error)}")
{:stop, {:error, :invalid_credential_refresh}, state}
end

{:error, reason} ->
{:error, reason}
end
else
# The message couldn't be decrypted. State remains unchanged
case Decryptor.decrypt("", message.payload, channel_state.decrypt_st) do
{:ok, plaintext, decrypt_st} ->
case Messages.decode(plaintext) do
{:ok, %Messages.Payload{} = payload} ->
message = struct(Ockam.Message, Map.from_struct(payload))

handle_decrypted_message(message, %Channel{
state
| channel_state: %{channel_state | decrypt_st: decrypt_st}
})

{:ok, :close} ->
Logger.debug("Peer closed secure channel, terminating #{inspect(state.address)}")
{:stop, :normal, channel_state}

## TODO: add tests
{:ok, %Messages.RefreshCredentials{contact: contact, credentials: credentials}} ->
with {:ok, peer_identity, peer_identity_id} <-
Identity.validate_contact_data(contact),
true <- peer_identity_id == channel_state.peer_identity_id,
:ok <- process_credentials(credentials, peer_identity_id, state.authorities) do
{:ok,
%Channel{
state
| channel_state: %{
channel_state
| peer_identity: peer_identity,
decrypt_st: decrypt_st
}
}}
else
error ->
Logger.warning("Invalid credential refresh: #{inspect(error)}")
{:stop, {:error, :invalid_credential_refresh}, state}
end

{:error, reason} ->
{:error, reason}
end

error ->
# The message couldn't be decrypted. State remains unchanged
Logger.warning("Failed to decrypt message, discarded: #{inspect(error)}")
{:ok, state}
end
Expand Down Expand Up @@ -627,7 +626,6 @@ defmodule Ockam.SecureChannel.Channel do
defp send_payload_over_encrypted_channel(payload, encrypt_st, peer_route) do
with {:ok, encoded} <- Messages.encode(payload),
{:ok, ciphertext, encrypt_st} <- Encryptor.encrypt("", encoded, encrypt_st) do
ciphertext = :bare.encode(ciphertext, :data)
envelope = %{onward_route: peer_route, return_route: [], payload: ciphertext}
Router.route(envelope)
{:ok, encrypt_st}
Expand Down Expand Up @@ -656,11 +654,4 @@ defmodule Ockam.SecureChannel.Channel do

Keyword.merge(opts, worker_mod: __MODULE__, worker_options: worker_opts)
end

defp bare_decode_strict(data, type) do
case :bare.decode(data, type) do
{:ok, result, ""} -> {:ok, result}
error -> {:error, {:invalid_bare_data, type, error}}
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ defmodule Ockam.SecureChannel.Messages do
"""
use TypedStruct

@address_schema {:struct,
@address_schema {:struct_values,
%{
type: %{key: 1, schema: :integer, required: true},
value: %{key: 2, schema: :charlist, required: true}
type: %{key: 0, schema: :integer, required: true},
value: %{key: 1, schema: :binary, required: true}
}}
def from_cbor_term(term) do
addr = TypedCBOR.from_cbor_term(@address_schema, term)
Expand Down Expand Up @@ -70,18 +70,36 @@ defmodule Ockam.SecureChannel.Messages do
end
end

@enum_schema {:variant_enum,
[
{Ockam.SecureChannel.Messages.Payload, 0},
{Ockam.SecureChannel.Messages.RefreshCredentials, 1},
close: 2
]}
defmodule PaddedMessage do
@moduledoc """
Top-level secure channel message, with padding support.
"""
use TypedStruct

@enum_schema {:variant_enum,
[
{Ockam.SecureChannel.Messages.Payload, 0},
{Ockam.SecureChannel.Messages.RefreshCredentials, 1},
close: 2
]}
typedstruct do
plugin(TypedCBOR.Plugin, encode_as: :list)

field(:message, %Ockam.SecureChannel.Messages.Payload{} | %RefreshCredentials{} | :close,
minicbor: [key: 0, schema: @enum_schema]
)

field(:padding, binary(), minicbor: [key: 1, schema: :binary])
end
end

def decode(encoded) do
TypedCBOR.decode_strict(@enum_schema, encoded)
with {:ok, %PaddedMessage{message: message}} <- PaddedMessage.decode_strict(encoded) do
{:ok, message}
end
end

def encode(msg) do
TypedCBOR.encode(@enum_schema, msg)
PaddedMessage.encode(%PaddedMessage{message: msg, padding: <<>>})
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Ockam.Transport.TCP.Client do

alias Ockam.Address
alias Ockam.Message
alias Ockam.Transport.TCP
alias Ockam.Transport.TCP.TransportMessage
alias Ockam.Transport.TCPAddress
alias Ockam.Wire

Expand Down Expand Up @@ -47,13 +47,17 @@ defmodule Ockam.Transport.TCP.Client do
case :gen_tcp.connect(inet_address, port, [
:binary,
protocol,
active: @active,
send_timeout: @send_timeout,
packet: 2,
nodelay: true
nodelay: true,
active: false
]) do
{:ok, socket} ->
# Connection Header, version "1"
:ok = :gen_tcp.send(socket, <<1>>)
{:ok, <<1>>} = :gen_tcp.recv(socket, 1, 5000)

:gen_tcp.controlling_process(socket, self())
:ok = :inet.setopts(socket, active: @active, packet: 4)

state =
Map.merge(state, %{
Expand Down Expand Up @@ -95,7 +99,7 @@ defmodule Ockam.Transport.TCP.Client do
@impl true
def handle_info({:tcp, socket, data}, %{socket: socket} = state) do
## TODO: send/receive message in multiple TCP packets
case Wire.decode(data, :tcp) do
case TransportMessage.decode(data) do
{:ok, message} ->
forwarded_message =
message
Expand Down Expand Up @@ -170,16 +174,8 @@ defmodule Ockam.Transport.TCP.Client do
defp encode_and_send_over_tcp(message, state) do
forwarded_message = Message.forward(message)

with {:ok, encoded_message} <- Wire.encode(forwarded_message) do
## TODO: send/receive message in multiple TCP packets
case byte_size(encoded_message) <= TCP.packed_size_limit() do
true ->
send_over_tcp(encoded_message, state)

false ->
Logger.error("Message to big for TCP: #{inspect(message)}")
{:error, {:message_too_big, message}}
end
with {:ok, encoded_message} <- TransportMessage.encode(forwarded_message) do
send_over_tcp(encoded_message, state)
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Ockam.Transport.TCP.Handler do

alias Ockam.Message
alias Ockam.Telemetry
alias Ockam.Transport.TCP
alias Ockam.Transport.TCP.TransportMessage

require Logger

Expand All @@ -28,11 +28,16 @@ defmodule Ockam.Transport.TCP.Handler do

{:ok, socket} = :ranch.handshake(ref, ranch_options)

# Header, protocol version "1" must be the first thing exchanged.
# It isn't send anymore after the initial exchange.
transport.send(socket, <<1>>)
{:ok, <<1>>} = transport.recv(socket, 1, 5000)

:ok =
:inet.setopts(socket, [
{:active, @active},
{:send_timeout, @send_timeout},
{:packet, 2},
{:packet, 4},
{:nodelay, true}
])

Expand Down Expand Up @@ -91,22 +96,21 @@ defmodule Ockam.Transport.TCP.Handler do
metadata: %{address: address}
)

case Ockam.Wire.decode(data, :tcp) do
case TransportMessage.decode(data) do
{:ok, decoded} ->
forwarded_message =
decoded
|> Message.trace(address)

send_to_router(forwarded_message)
Telemetry.emit_event(function_name, metadata: %{name: "decoded_data"})
{:noreply, state, idle_timeout}

{:error, %Ockam.Wire.DecodeError{} = e} ->
{:error, e} ->
start_time = Telemetry.emit_start_event(function_name)
Telemetry.emit_exception_event(function_name, start_time, e)
raise e
{:stop, {:error, e}, state}
end

{:noreply, state, idle_timeout}
end

def handle_info({:tcp_closed, socket}, %{socket: socket, transport: transport} = state) do
Expand Down Expand Up @@ -165,23 +169,8 @@ defmodule Ockam.Transport.TCP.Handler do
) do
forwarded_message = Message.forward(message)

case Ockam.Wire.encode(forwarded_message) do
{:ok, encoded} ->
## TODO: send/receive message in multiple TCP packets
case byte_size(encoded) <= TCP.packed_size_limit() do
true ->
tcp_wrapper.wrap_tcp_call(transport, :send, [socket, encoded])

false ->
Logger.error("Message to big for TCP: #{inspect(message)}")
raise {:message_too_big, message}
end

a ->
Logger.error("TCP transport send error #{inspect(a)}")
raise a
end

{:ok, encoded} = TransportMessage.encode(forwarded_message)
:ok = tcp_wrapper.wrap_tcp_call(transport, :send, [socket, encoded])
{:ok, state}
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ if Code.ensure_loaded?(:ranch) do
transport = :ranch_tcp
transport_options = :ranch.normalize_opts(port: port, ip: ip)
protocol = Ockam.Transport.TCP.Handler
protocol_options = [packet: 2, nodelay: true, handler_options: handler_options]
protocol_options = [nodelay: true, handler_options: handler_options]

with {:ok, _apps} <- Application.ensure_all_started(:ranch) do
{:ok, listener_address} =
Expand Down
Loading
Loading