Skip to content

Commit

Permalink
feat(CE): add sftp source connector (#558)
Browse files Browse the repository at this point in the history
Co-authored-by: TivonB-AI2 <124182151+TivonB-AI2@users.noreply.github.com>
  • Loading branch information
github-actions[bot] and TivonB-AI2 authored Jan 3, 2025
1 parent 90f8e1b commit 291466b
Show file tree
Hide file tree
Showing 8 changed files with 389 additions and 2 deletions.
2 changes: 1 addition & 1 deletion integrations/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ GIT
PATH
remote: .
specs:
multiwoven-integrations (0.16.2)
multiwoven-integrations (0.17.0)
MailchimpMarketing
activesupport
async-websocket
Expand Down
1 change: 1 addition & 0 deletions integrations/lib/multiwoven/integrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
require_relative "integrations/source/google_vertex_model/client"
require_relative "integrations/source/http_model/client"
require_relative "integrations/source/open_ai/client"
require_relative "integrations/source/sftp/client"

# Destination
require_relative "integrations/destination/klaviyo/client"
Expand Down
3 changes: 2 additions & 1 deletion integrations/lib/multiwoven/integrations/rollout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Multiwoven
module Integrations
VERSION = "0.16.2"
VERSION = "0.17.0"

ENABLED_SOURCES = %w[
Snowflake
Expand All @@ -21,6 +21,7 @@ module Integrations
VertexModel
HttpModel
OpenAI
Sftp
].freeze

ENABLED_DESTINATIONS = %w[
Expand Down
133 changes: 133 additions & 0 deletions integrations/lib/multiwoven/integrations/source/sftp/client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# frozen_string_literal: true

module Multiwoven::Integrations::Source
module Sftp
include Multiwoven::Integrations::Core
class Client < SourceConnector
def check_connection(connection_config)
connection_config = connection_config.with_indifferent_access
create_connection(connection_config)
if @sftp.stat!(@remote_file_path)
success_status
else
failure_status(nil)
end
rescue StandardError => e
handle_exception(e, {
context: "SFTP:CHECK_CONNECTION:EXCEPTION",
type: "error"
})
failure_status(e)
end

def discover(connection_config)
connection_config = connection_config.with_indifferent_access
db = create_connection(connection_config)
@sftp.download!(@remote_file_path, @tempfile.path)
query = "SELECT * FROM read_csv_auto('#{@tempfile.path}')"
records = db.query(query).columns
catalog = Catalog.new(streams: create_streams(records.map(&:name)))
catalog.to_multiwoven_message
rescue StandardError => e
handle_exception(e, {
context: "SFTP:DISCOVER:EXCEPTION",
type: "error"
})
ensure
@tempfile&.close!
end

def read(sync_config)
connection_config = sync_config.source.connection_specification
connection_config = connection_config.with_indifferent_access
conn = create_connection(connection_config)
query = sync_config.model.query
query = batched_query(query, sync_config.limit, sync_config.offset) unless sync_config.limit.nil? && sync_config.offset.nil?
query(conn, query)
rescue StandardError => e
handle_exception(e, {
context: "SFTP:READ:EXCEPTION",
type: "error",
sync_id: sync_config.sync_id,
sync_run_id: sync_config.sync_run_id
})
end

private

def create_connection(connection_config)
initialize_file_path(connection_config)
@sftp = with_sftp_client(connection_config)
conn = DuckDB::Database.open.connect
conn.execute(INSTALL_HTTPFS_QUERY)
conn
end

def initialize_file_path(connection_config)
@remote_file_path = File.join(
connection_config[:file_path],
"#{connection_config[:file_name]}.#{connection_config[:format_type]}"
)
@tempfile = Tempfile.new(File.basename(@remote_file_path))
end

def with_sftp_client(connection_config, &block)
Net::SFTP.start(
connection_config[:host],
connection_config[:username],
password: connection_config[:password],
port: connection_config.fetch(:port, 22), &block
)
end

def get_results(conn, query)
results = conn.query(query)
hash_array_values(results)
end

def query(conn, query)
query_regex = /\ASELECT\s+(?<columns>[\w,\s]+)\s+FROM\s+\w+\s*(?:LIMIT\s+(?<limit>\d+))?\s*(?:OFFSET\s+(?<offset>\d+))?\z/i
match = query.match(query_regex)
columns = match[:columns] || "*"
offset = match[:offset].to_i || 0
limit = match[:limit]&.to_i || nil
@sftp.download!(@remote_file_path, @tempfile.path)
adjusted_query = "SELECT #{columns} FROM read_csv_auto('#{@tempfile.path}') OFFSET #{offset}"
adjusted_query += " LIMIT #{limit}" if limit
records = get_results(conn, adjusted_query)
records.map do |row|
RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message
end
end

def hash_array_values(describe)
keys = describe.columns.map(&:name)
describe.map do |row|
Hash[keys.zip(row)]
end
end

def create_streams(records)
group_by_table(records).map do |_, r|
Multiwoven::Integrations::Protocol::Stream.new(name: r[:tablename], action: StreamAction["fetch"], json_schema: convert_to_json_schema(r[:columns]))
end
end

def group_by_table(records)
result = {}
records.each_with_index do |column, index|
table_name = @remote_file_path
column_data = {
column_name: column,
type: "string",
optional: true
}
result[index] ||= {}
result[index][:tablename] = table_name
result[index][:columns] = [column_data]
end
result
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"data": {
"name": "Sftp",
"title": "SFTP",
"connector_type": "source",
"category": "File Storage",
"documentation_url": "https://docs.mutliwoven.com",
"github_issue_label": "source-sftp",
"icon": "icon.svg",
"license": "MIT",
"release_stage": "alpha",
"support_level": "community",
"tags": ["language:ruby", "multiwoven"]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
{
"documentation_url": "https://docs.multiwoven.com/integrations/sources/sftp",
"stream_type": "dynamic",
"connection_specification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "SFTP",
"required": ["host", "username", "password", "file_path", "format_type" ],
"properties": {
"host": {
"title": "Host",
"description": "Hostname of the SFTP server.",
"type": "string",
"order": 0
},
"port": {
"title": "Port",
"description": "Port of the SFTP server.",
"type": "integer",
"minimum": 0,
"maximum": 65536,
"default": 22,
"order": 1
},
"username": {
"title": "User Name",
"description": "Username to use to access the SFTP server.",
"type": "string",
"order": 2
},
"password": {
"title": "Password",
"description": "Password associated with the username.",
"type": "string",
"multiwoven_secret": true,
"order": 3
},
"file_path": {
"title": "File path",
"type": "string",
"description": "Path to the directory where file is stored.",
"order": 4
},
"file_name": {
"title": "File Name",
"type": "string",
"description": "Name of the file to be written.",
"order": 5
},
"format_type": {
"title": "File Format Type",
"type": "string",
"description": "Format of the data output.",
"enum": ["csv"],
"default": "csv",
"order": 6
}
}
}
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit 291466b

Please sign in to comment.