Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,43 @@
module ForestAdminAgent
module Routes
module Workflow
# Forwards workflow-execution traffic from the agent to the workflow executor.
# Mounted only when the integrator sets `workflow_executor_url`
# Generic proxy: forwards any sub-path/verb under AGENT_PREFIX to EXECUTOR_PREFIX, so a
# new executor route needs no change here (PRD-567). Mounted only when `workflow_executor_url` is set.
class WorkflowExecutorProxy < AbstractAuthenticatedRoute
AGENT_PREFIX = '/_internal/workflow-executions'.freeze
EXECUTOR_PREFIX = '/runs'.freeze
FORWARDED_HEADERS = %w[Authorization Cookie].freeze
ROUTING_KEYS = %w[run_id route_alias controller action format].freeze
# Never forwarded (request or response): hop-by-hop, Host, and body-framing headers.
# Faraday and `render json:` set their own length and de/recompress the body, so relaying
# the upstream length/encoding would mismatch the bytes we actually send — and forwarding
# accept-encoding would disable Faraday's transparent gzip decompression.
SKIPPED_HEADERS = %w[
connection keep-alive transfer-encoding upgrade te trailer
proxy-authenticate proxy-authorization host
content-length content-encoding accept-encoding
].freeze
# Fragments that could escape EXECUTOR_PREFIX once decoded.
UNSAFE_PATH_FRAGMENTS = ['..', '%2e', '%2E', '\\', "\0"].freeze
OPEN_TIMEOUT = 2
GET_TIMEOUT = 10
TRIGGER_TIMEOUT = 120
REQUEST_TIMEOUT = 120

def setup_routes
return self unless executor_configured?

add_route(
'forest_workflow_run_show',
'get',
"#{AGENT_PREFIX}/:run_id",
->(args) { handle_request(:get, args) }
)
add_route(
'forest_workflow_run_trigger',
'post',
"#{AGENT_PREFIX}/:run_id/trigger",
->(args) { handle_request(:post, args) }
'forest_workflow_executor_proxy',
:all,
"#{AGENT_PREFIX}/*path",
->(args) { handle_request(args) }
)

self
end

def handle_request(method, args = {})
def handle_request(args = {})
build(args)

base_url = configured_executor_url
run_id = args.dig(:params, 'run_id') || args.dig(:params, :run_id)
path = build_path(run_id, method)
response = forward(method, base_url, path, args)
method = (args[:method] || 'get').to_s.downcase.to_sym
response = forward(method, build_target_url(args), args)

{
content: response.body,
Expand Down Expand Up @@ -67,76 +67,85 @@ def configured_executor_url
url.to_s.sub(%r{/+\z}, '')
end

def build_path(run_id, method)
suffix = method == :post ? '/trigger' : ''
"#{EXECUTOR_PREFIX}/#{run_id}#{suffix}"
def build_target_url(args)
path = build_executor_path(args.dig(:params, 'path') || args.dig(:params, :path))
query = args[:query_string].to_s
url = "#{configured_executor_url}#{path}"

query.empty? ? url : "#{url}?#{query}"
end

# Security boundary: reject anything that could escape EXECUTOR_PREFIX.
def build_executor_path(raw_path)
path = raw_path.to_s
raise Http::Exceptions::NotFoundError, 'Invalid workflow executor path' if unsafe_path?(path)

"#{EXECUTOR_PREFIX}/#{path}"
end

def unsafe_path?(path)
return true if path.empty? || path.start_with?('/')

UNSAFE_PATH_FRAGMENTS.any? { |fragment| path.include?(fragment) }
end

def forward(method, base_url, path, args)
query = forwarded_query_params(args[:params])
def forward(method, target_url, args)
headers = forwarded_request_headers(args[:headers])
body = forwarded_body(method, args[:params])
target_url = "#{base_url}#{path}"
body = method == :get ? nil : args[:body]

client = build_client(timeout_for(method))
client.run_request(method, target_url, body, headers) do |req|
req.params.update(query) unless query.empty?
end
build_client.run_request(method, target_url, body, headers)
rescue Faraday::TimeoutError => e
raise Http::Exceptions::ServiceUnavailableError.new('Workflow executor timed out', cause: e)
rescue Faraday::ConnectionFailed => e
raise Http::Exceptions::ServiceUnavailableError.new('Workflow executor unreachable', cause: e)
# Any other transport-level Faraday failure (SSL, etc.) is an executor-reachability problem,
# not a 500 — Faraday never raises on executor 4xx/5xx (no raise_error middleware).
rescue Faraday::Error => e
raise Http::Exceptions::ServiceUnavailableError.new('Workflow executor request failed', cause: e)
end

def timeout_for(method)
method == :get ? GET_TIMEOUT : TRIGGER_TIMEOUT
end

def build_client(request_timeout)
Faraday.new(request: { open_timeout: OPEN_TIMEOUT, timeout: request_timeout }) do |f|
f.request :json
def build_client
Faraday.new(request: { open_timeout: OPEN_TIMEOUT, timeout: REQUEST_TIMEOUT }) do |f|
# No request :json middleware: body is forwarded raw (not reshaped).
f.response :json, content_type: /\bjson$/
f.adapter Faraday.default_adapter
end
end

# Strip Rails-injected routing keys; keep only true client query params.
def forwarded_query_params(params)
return {} unless params.is_a?(Hash)
# `env` is the Rack env: real HTTP headers are the HTTP_* keys (+ CONTENT_TYPE/CONTENT_LENGTH);
# rack.*/action_dispatch.*/server vars are not headers and are dropped.
def forwarded_request_headers(env)
return {} unless env.is_a?(Hash)

params.each_with_object({}) do |(key, value), acc|
next if ROUTING_KEYS.include?(key.to_s)
next if value.is_a?(Hash) || value.is_a?(Array) # 'data' body, etc.
env.each_with_object({}) do |(key, value), acc|
name = http_header_name(key.to_s)
next unless name
next if SKIPPED_HEADERS.include?(name.downcase)
next if value.nil? || value.to_s.empty?

acc[key.to_s] = value
acc[name] = value.to_s
end
end

def forwarded_request_headers(headers)
return {} unless headers.is_a?(Hash)

FORWARDED_HEADERS.each_with_object({}) do |name, acc|
value = headers[name] || headers[name.downcase] || headers["HTTP_#{name.upcase}"]
acc[name] = value if value && !value.to_s.empty?
def http_header_name(env_key)
if env_key.start_with?('HTTP_')
titleize_header(env_key.delete_prefix('HTTP_'))
elsif %w[CONTENT_TYPE CONTENT_LENGTH].include?(env_key)
titleize_header(env_key)
end
end

def forwarded_body(method, params)
return nil if method == :get
return nil unless params.is_a?(Hash)

# JSON request bodies arrive parsed under :data when sent as JSON:API,
# or as the raw top-level params hash otherwise. Prefer :data when
# present; fall back to a sanitized copy of params.
body = params['data'] || params[:data]
return body if body

params.reject { |key, _| ROUTING_KEYS.include?(key.to_s) }
def titleize_header(rack_name)
rack_name.split('_').map(&:capitalize).join('-')
end

def forwarded_response_headers(response)
content_type = response.headers['content-type'] || response.headers['Content-Type']
content_type ? { 'Content-Type' => content_type } : {}
response.headers.each_with_object({}) do |(name, value), acc|
next if name.nil? || SKIPPED_HEADERS.include?(name.to_s.downcase)
next if value.nil? || value.to_s.empty?

acc[name.to_s] = value.to_s
end

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function with high complexity (count = 7): forwarded_response_headers [qlty:function-complexity]

end
end
end
Expand Down
Loading
Loading