Skip to content

Commit c143083

Browse files
committed
Merge branch 'main' into cherry-pick-ce-commit-85bb939d469bd820c456e23007095cf3043b9013
2 parents c8e4f1d + f30b2c3 commit c143083

File tree

22 files changed

+754
-97
lines changed

22 files changed

+754
-97
lines changed

integrations/Gemfile.lock

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,7 @@ GIT
77
PATH
88
remote: .
99
specs:
10-
<<<<<<< HEAD
11-
multiwoven-integrations (0.15.10)
12-
=======
1310
multiwoven-integrations (0.16.2)
14-
>>>>>>> 85bb939d (chore(CE): add payload limit to databricks model (#745))
1511
MailchimpMarketing
1612
activesupport
1713
async-websocket

integrations/lib/multiwoven/integrations.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@
5252
require_relative "integrations/core/base_connector"
5353
require_relative "integrations/core/source_connector"
5454
require_relative "integrations/core/destination_connector"
55+
require_relative "integrations/core/http_helper"
5556
require_relative "integrations/core/http_client"
57+
require_relative "integrations/core/streaming_http_client"
5658
require_relative "integrations/core/query_builder"
5759

5860
# Source
@@ -71,6 +73,7 @@
7173
require_relative "integrations/source/aws_sagemaker_model/client"
7274
require_relative "integrations/source/google_vertex_model/client"
7375
require_relative "integrations/source/http_model/client"
76+
require_relative "integrations/source/open_ai/client"
7477

7578
# Destination
7679
require_relative "integrations/destination/klaviyo/client"

integrations/lib/multiwoven/integrations/core/constants.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ module Constants
6363
# google sheets
6464
GOOGLE_SHEETS_SCOPE = "https://www.googleapis.com/auth/drive"
6565
GOOGLE_SPREADSHEET_ID_REGEX = %r{/d/([-\w]{20,})/}.freeze
66+
67+
OPEN_AI_URL = "https://api.openai.com/v1/chat/completions"
6668
end
6769
end
6870
end

integrations/lib/multiwoven/integrations/core/http_client.rb

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,40 +3,14 @@
33
module Multiwoven
44
module Integrations::Core
55
class HttpClient
6+
extend HttpHelper
67
class << self
78
def request(url, method, payload: nil, headers: {}, config: {})
89
uri = URI(url)
9-
http = Net::HTTP.new(uri.host, uri.port)
10-
http.use_ssl = (uri.scheme == "https")
11-
12-
# Set timeout if provided
13-
if config[:timeout]
14-
timeout_value = config[:timeout].to_f
15-
http.open_timeout = timeout_value
16-
http.read_timeout = timeout_value
17-
end
18-
10+
http = configure_http(uri, config)
1911
request = build_request(method, uri, payload, headers)
2012
http.request(request)
2113
end
22-
23-
private
24-
25-
def build_request(method, uri, payload, headers)
26-
request_class = case method.upcase
27-
when Constants::HTTP_GET then Net::HTTP::Get
28-
when Constants::HTTP_POST then Net::HTTP::Post
29-
when Constants::HTTP_PUT then Net::HTTP::Put
30-
when Constants::HTTP_PATCH then Net::HTTP::Patch
31-
when Constants::HTTP_DELETE then Net::HTTP::Delete
32-
else raise ArgumentError, "Unsupported HTTP method: #{method}"
33-
end
34-
35-
request = request_class.new(uri)
36-
headers.each { |key, value| request[key] = value }
37-
request.body = payload.to_json if payload && %w[POST PUT PATCH].include?(method.upcase)
38-
request
39-
end
4014
end
4115
end
4216
end
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# frozen_string_literal: true
2+
3+
module Multiwoven
4+
module Integrations::Core
5+
module HttpHelper
6+
def build_request(method, uri, payload, headers)
7+
request_class = case method.upcase
8+
when Constants::HTTP_GET then Net::HTTP::Get
9+
when Constants::HTTP_POST then Net::HTTP::Post
10+
when Constants::HTTP_PUT then Net::HTTP::Put
11+
when Constants::HTTP_PATCH then Net::HTTP::Patch
12+
when Constants::HTTP_DELETE then Net::HTTP::Delete
13+
else raise ArgumentError, "Unsupported HTTP method: #{method}"
14+
end
15+
16+
request = request_class.new(uri)
17+
headers.each { |key, value| request[key] = value }
18+
request.body = payload.to_json if payload && %w[POST PUT PATCH].include?(method.upcase)
19+
request
20+
end
21+
22+
def configure_http(uri, config)
23+
http = Net::HTTP.new(uri.host, uri.port)
24+
http.use_ssl = (uri.scheme == "https")
25+
26+
if config[:timeout]
27+
timeout_value = config[:timeout].to_f
28+
http.open_timeout = timeout_value
29+
http.read_timeout = timeout_value
30+
end
31+
32+
http
33+
end
34+
end
35+
end
36+
end

integrations/lib/multiwoven/integrations/core/source_connector.rb

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,28 @@ def batched_query(sql_query, limit, offset)
3939
# Appending the LIMIT and OFFSET clauses to the SQL query
4040
"#{sql_query} LIMIT #{limit} OFFSET #{offset}"
4141
end
42+
43+
def send_request(options = {})
44+
Multiwoven::Integrations::Core::HttpClient.request(
45+
options[:url],
46+
options[:http_method],
47+
payload: options[:payload],
48+
headers: options[:headers],
49+
config: options[:config]
50+
)
51+
end
52+
53+
def send_streaming_request(options = {})
54+
Multiwoven::Integrations::Core::StreamingHttpClient.request(
55+
options[:url],
56+
options[:http_method],
57+
payload: options[:payload],
58+
headers: options[:headers],
59+
config: options[:config]
60+
) do |chunk|
61+
yield chunk if block_given? # Pass each chunk for processing (streaming response)
62+
end
63+
end
4264
end
4365
end
4466
end
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# frozen_string_literal: true
2+
3+
module Multiwoven
4+
module Integrations::Core
5+
class StreamingHttpClient
6+
extend HttpHelper
7+
class << self
8+
def request(url, method, payload: nil, headers: {}, config: {})
9+
uri = URI(url)
10+
http = configure_http(uri, config)
11+
request = build_request(method, uri, payload, headers)
12+
http.request(request) do |response|
13+
response.read_body do |chunk|
14+
yield chunk if block_given? # Pass each response chunk
15+
end
16+
end
17+
end
18+
end
19+
end
20+
end
21+
end

integrations/lib/multiwoven/integrations/rollout.rb

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,7 @@
22

33
module Multiwoven
44
module Integrations
5-
<<<<<<< HEAD
6-
VERSION = "0.15.10"
7-
=======
85
VERSION = "0.16.2"
9-
>>>>>>> 85bb939d (chore(CE): add payload limit to databricks model (#745))
106

117
ENABLED_SOURCES = %w[
128
Snowflake
@@ -24,6 +20,7 @@ module Integrations
2420
AwsSagemakerModel
2521
VertexModel
2622
HttpModel
23+
OpenAI
2724
].freeze
2825

2926
ENABLED_DESTINATIONS = %w[

integrations/lib/multiwoven/integrations/source/http_model/client.rb

Lines changed: 63 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,17 @@ module HttpModel
55
include Multiwoven::Integrations::Core
66
class Client < SourceConnector
77
def check_connection(connection_config)
8-
connection_config = connection_config.with_indifferent_access
9-
url_host = connection_config[:url_host]
10-
http_method = connection_config[:http_method]
11-
headers = connection_config[:headers]
12-
payload = JSON.parse(connection_config[:request_format])
13-
config = connection_config[:config]
14-
config[:timeout] ||= 30
15-
response = send_request(url_host, http_method, payload, headers, config)
16-
if success?(response)
17-
success_status
18-
else
19-
failure_status(nil)
20-
end
8+
connection_config = prepare_config(connection_config)
9+
response = send_request(
10+
url: connection_config[:url_host],
11+
http_method: connection_config[:http_method],
12+
payload: JSON.parse(connection_config[:request_format]),
13+
headers: connection_config[:headers],
14+
config: connection_config[:config]
15+
)
16+
success?(response) ? success_status : failure_status(nil)
2117
rescue StandardError => e
22-
handle_exception(e, {
23-
context: "HTTP MODEL:CHECK_CONNECTION:EXCEPTION",
24-
type: "error"
25-
})
18+
handle_exception(e, { context: "HTTP MODEL:CHECK_CONNECTION:EXCEPTION", type: "error" })
2619
failure_status(e)
2720
end
2821

@@ -31,40 +24,66 @@ def discover(_connection_config = nil)
3124
catalog = build_catalog(catalog_json)
3225
catalog.to_multiwoven_message
3326
rescue StandardError => e
34-
handle_exception(e, {
35-
context: "HTTP MODEL:DISCOVER:EXCEPTION",
36-
type: "error"
37-
})
27+
handle_exception(e, { context: "HTTP MODEL:DISCOVER:EXCEPTION", type: "error" })
3828
end
3929

4030
def read(sync_config)
41-
connection_config = sync_config.source.connection_specification
42-
connection_config = connection_config.with_indifferent_access
31+
connection_config = prepare_config(sync_config.source.connection_specification)
32+
stream = connection_config[:is_stream] ||= false
4333
# The server checks the ConnectorQueryType.
4434
# If it's "ai_ml," the server calculates the payload and passes it as a query in the sync config model protocol.
4535
# This query is then sent to the AI/ML model.
46-
payload = JSON.parse(sync_config.model.query)
47-
run_model(connection_config, payload)
36+
payload = parse_json(sync_config.model.query)
37+
38+
if stream
39+
run_model_stream(connection_config, payload) { |message| yield message if block_given? }
40+
else
41+
run_model(connection_config, payload)
42+
end
4843
rescue StandardError => e
49-
handle_exception(e, {
50-
context: "HTTP MODEL:READ:EXCEPTION",
51-
type: "error"
52-
})
44+
handle_exception(e, { context: "HTTP MODEL:READ:EXCEPTION", type: "error" })
5345
end
5446

5547
private
5648

49+
def prepare_config(config)
50+
config.with_indifferent_access.tap do |conf|
51+
conf[:config][:timeout] ||= 30
52+
end
53+
end
54+
55+
def parse_json(json_string)
56+
JSON.parse(json_string)
57+
rescue JSON::ParserError => e
58+
handle_exception(e, { context: "HTTP MODEL:PARSE_JSON:EXCEPTION", type: "error" })
59+
{}
60+
end
61+
5762
def run_model(connection_config, payload)
58-
connection_config = connection_config.with_indifferent_access
59-
url_host = connection_config[:url_host]
60-
headers = connection_config[:headers]
61-
config = connection_config[:config]
62-
http_method = connection_config[:http_method]
63-
config[:timeout] ||= 30
64-
response = send_request(url_host, http_method, payload, headers, config)
63+
response = send_request(
64+
url: connection_config[:url_host],
65+
http_method: connection_config[:http_method],
66+
payload: payload,
67+
headers: connection_config[:headers],
68+
config: connection_config[:config]
69+
)
6570
process_response(response)
6671
rescue StandardError => e
67-
handle_exception(e, context: "HTTP MODEL:RUN_MODEL:EXCEPTION", type: "error")
72+
handle_exception(e, { context: "HTTP MODEL:RUN_MODEL:EXCEPTION", type: "error" })
73+
end
74+
75+
def run_model_stream(connection_config, payload)
76+
send_streaming_request(
77+
url: connection_config[:url_host],
78+
http_method: connection_config[:http_method],
79+
payload: payload,
80+
headers: connection_config[:headers],
81+
config: connection_config[:config]
82+
) do |chunk|
83+
process_streaming_response(chunk) { |message| yield message if block_given? }
84+
end
85+
rescue StandardError => e
86+
handle_exception(e, { context: "HTTP MODEL:RUN_STREAM_MODEL:EXCEPTION", type: "error" })
6887
end
6988

7089
def process_response(response)
@@ -74,16 +93,15 @@ def process_response(response)
7493
else
7594
create_log_message("HTTP MODEL:RUN_MODEL", "error", "request failed: #{response.body}")
7695
end
96+
rescue StandardError => e
97+
handle_exception(e, { context: "HTTP MODEL:PROCESS_RESPONSE:EXCEPTION", type: "error" })
7798
end
7899

79-
def send_request(url, http_method, payload, headers, config)
80-
Multiwoven::Integrations::Core::HttpClient.request(
81-
url,
82-
http_method,
83-
payload: payload,
84-
headers: headers,
85-
config: config
86-
)
100+
def process_streaming_response(chunk)
101+
data = JSON.parse(chunk)
102+
yield [RecordMessage.new(data: data, emitted_at: Time.now.to_i).to_multiwoven_message] if block_given?
103+
rescue StandardError => e
104+
handle_exception(e, { context: "HTTP MODEL:PROCESS_STREAMING_RESPONSE:EXCEPTION", type: "error" })
87105
end
88106
end
89107
end

integrations/lib/multiwoven/integrations/source/http_model/config/meta.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"title": "HTTP Model Endpoint",
55
"connector_type": "source",
66
"category": "AI Model",
7-
"documentation_url": "https://docs.mutliwoven.com",
7+
"documentation_url": "https://docs.mutltiwoven.com",
88
"github_issue_label": "source-http-model",
99
"icon": "icon.svg",
1010
"license": "MIT",

integrations/lib/multiwoven/integrations/source/http_model/config/spec.json

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
"$schema": "http://json-schema.org/draft-07/schema#",
77
"title": "HTTP Model Endpoint",
88
"type": "object",
9-
"required": ["url_host"],
9+
"required": ["url_host", "http_method"],
1010
"properties": {
1111
"http_method": {
1212
"type": "string",
@@ -19,10 +19,17 @@
1919
"title": "URL",
2020
"order": 1
2121
},
22+
"is_stream": {
23+
"type": "boolean",
24+
"title": "Streaming Enabled",
25+
"description": "Enables data streaming for such as chat, when supported by the model. When true, messages and model data are processed in chunks for immediate delivery, enhancing responsiveness. Default is false, processing only after the entire response is received.",
26+
"default": false,
27+
"order": 2
28+
},
2229
"headers": {
2330
"title": "HTTP Headers",
2431
"description": "Custom headers to include in the HTTP request. Useful for authentication, content type specifications, and other request metadata.",
25-
"order": 2,
32+
"order": 3,
2633
"additionalProperties": {
2734
"type": "string"
2835
},
@@ -42,21 +49,21 @@
4249
"order": 0
4350
}
4451
},
45-
"order": 3
52+
"order": 4
4653
},
4754
"request_format": {
4855
"title": "Request Format",
4956
"description": "Sample Request Format",
5057
"type": "string",
5158
"x-request-format": true,
52-
"order": 4
59+
"order": 5
5360
},
5461
"response_format": {
5562
"title": "Response Format",
5663
"description": "Sample Response Format",
5764
"type": "string",
5865
"x-response-format": true,
59-
"order": 5
66+
"order": 6
6067
}
6168
}
6269
}

0 commit comments

Comments
 (0)