Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/aws
2 changes: 1 addition & 1 deletion contrib/aws-crt-cpp
35 changes: 12 additions & 23 deletions src/Common/ProxyConfigurationResolverProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,8 @@ namespace
return configuration.has(config_prefix + ".uri");
}

/*
* New syntax requires protocol prefix "<http> or <https>"
* */
/* New syntax requires protocol prefix "<http> or <https>"
*/
std::optional<std::string> getProtocolPrefix(
ProxyConfiguration::Protocol request_protocol,
const String & config_prefix,
Expand All @@ -130,22 +129,18 @@ namespace
return protocol_prefix;
}

template <bool new_syntax>
std::optional<std::string> calculatePrefixBasedOnSettingsSyntax(
bool new_syntax,
ProxyConfiguration::Protocol request_protocol,
const String & config_prefix,
const Poco::Util::AbstractConfiguration & configuration
)
{
if (!configuration.has(config_prefix))
{
return std::nullopt;
}

if constexpr (new_syntax)
{
if (new_syntax)
return getProtocolPrefix(request_protocol, config_prefix, configuration);
}

return config_prefix;
}
Expand All @@ -155,24 +150,21 @@ std::shared_ptr<ProxyConfigurationResolver> ProxyConfigurationResolverProvider::
Protocol request_protocol,
const Poco::Util::AbstractConfiguration & configuration)
{
if (auto resolver = getFromSettings(request_protocol, "proxy", configuration))
{
if (auto resolver = getFromSettings(true, request_protocol, "proxy", configuration))
return resolver;
}

return std::make_shared<EnvironmentProxyConfigurationResolver>(
request_protocol,
isTunnelingDisabledForHTTPSRequestsOverHTTPProxy(configuration));
}

template <bool is_new_syntax>
std::shared_ptr<ProxyConfigurationResolver> ProxyConfigurationResolverProvider::getFromSettings(
bool new_syntax,
Protocol request_protocol,
const String & config_prefix,
const Poco::Util::AbstractConfiguration & configuration
)
const Poco::Util::AbstractConfiguration & configuration)
{
auto prefix_opt = calculatePrefixBasedOnSettingsSyntax<is_new_syntax>(request_protocol, config_prefix, configuration);
auto prefix_opt = calculatePrefixBasedOnSettingsSyntax(new_syntax, request_protocol, config_prefix, configuration);

if (!prefix_opt)
{
Expand All @@ -195,20 +187,17 @@ std::shared_ptr<ProxyConfigurationResolver> ProxyConfigurationResolverProvider::
std::shared_ptr<ProxyConfigurationResolver> ProxyConfigurationResolverProvider::getFromOldSettingsFormat(
Protocol request_protocol,
const String & config_prefix,
const Poco::Util::AbstractConfiguration & configuration
)
const Poco::Util::AbstractConfiguration & configuration)
{
/*
* First try to get it from settings only using the combination of config_prefix and configuration.
/* First try to get it from settings only using the combination of config_prefix and configuration.
* This logic exists for backward compatibility with old S3 storage specific proxy configuration.
* */
if (auto resolver = ProxyConfigurationResolverProvider::getFromSettings<false>(request_protocol, config_prefix + ".proxy", configuration))
if (auto resolver = ProxyConfigurationResolverProvider::getFromSettings(false, request_protocol, config_prefix + ".proxy", configuration))
{
return resolver;
}

/*
* In case the combination of config_prefix and configuration does not provide a resolver, try to get it from general / new settings.
/* In case the combination of config_prefix and configuration does not provide a resolver, try to get it from general / new settings.
* Falls back to Environment resolver if no configuration is found.
* */
return ProxyConfigurationResolverProvider::get(request_protocol, configuration);
Expand Down
5 changes: 2 additions & 3 deletions src/Common/ProxyConfigurationResolverProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ class ProxyConfigurationResolverProvider
);

private:
template <bool is_new_syntax = true>
static std::shared_ptr<ProxyConfigurationResolver> getFromSettings(
bool is_new_syntax,
Protocol protocol,
const String & config_prefix,
const Poco::Util::AbstractConfiguration & configuration
);
const Poco::Util::AbstractConfiguration & configuration);
};

}
2 changes: 1 addition & 1 deletion src/IO/ReadWriteBufferFromHTTP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(

if (iter == http_header_entries.end())
{
http_header_entries.emplace_back(user_agent, fmt::format("ClickHouse/{}", VERSION_STRING));
http_header_entries.emplace_back(user_agent, fmt::format("ClickHouse/{}{}", VERSION_STRING, VERSION_OFFICIAL));
}

if (!delay_initialization && use_external_buffer)
Expand Down
6 changes: 3 additions & 3 deletions src/IO/S3/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -982,10 +982,10 @@ PocoHTTPClientConfiguration ClientFactory::createClientConfiguration( // NOLINT
{
auto context = Context::getGlobalContextInstance();
chassert(context);
auto proxy_configuration_resolver = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::protocolFromString(protocol), context->getConfigRef());
auto proxy_configuration_resolver = ProxyConfigurationResolverProvider::get(ProxyConfiguration::protocolFromString(protocol), context->getConfigRef());

auto per_request_configuration = [=] () { return proxy_configuration_resolver->resolve(); };
auto error_report = [=] (const DB::ProxyConfiguration & req) { proxy_configuration_resolver->errorReport(req); };
auto per_request_configuration = [=]{ return proxy_configuration_resolver->resolve(); };
auto error_report = [=](const ProxyConfiguration & req) { proxy_configuration_resolver->errorReport(req); };

auto config = PocoHTTPClientConfiguration(
per_request_configuration,
Expand Down
32 changes: 25 additions & 7 deletions src/IO/S3/PocoHTTPClient.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <Poco/Timespan.h>
#include <Common/config_version.h>
#include "config.h"

#if USE_AWS_S3
Expand All @@ -17,6 +18,7 @@
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <IO/S3/ProviderType.h>
#include <Interpreters/Context.h>

#include <aws/core/http/HttpRequest.h>
#include <aws/core/http/HttpResponse.h>
Expand All @@ -29,6 +31,7 @@

#include <boost/algorithm/string.hpp>


static const int SUCCESS_RESPONSE_MIN = 200;
static const int SUCCESS_RESPONSE_MAX = 299;

Expand Down Expand Up @@ -84,7 +87,7 @@ namespace DB::S3
{

PocoHTTPClientConfiguration::PocoHTTPClientConfiguration(
std::function<DB::ProxyConfiguration()> per_request_configuration_,
std::function<ProxyConfiguration()> per_request_configuration_,
const String & force_region_,
const RemoteHostFilter & remote_host_filter_,
unsigned int s3_max_redirects_,
Expand All @@ -94,7 +97,7 @@ PocoHTTPClientConfiguration::PocoHTTPClientConfiguration(
bool s3_use_adaptive_timeouts_,
const ThrottlerPtr & get_request_throttler_,
const ThrottlerPtr & put_request_throttler_,
std::function<void(const DB::ProxyConfiguration &)> error_report_)
std::function<void(const ProxyConfiguration &)> error_report_)
: per_request_configuration(per_request_configuration_)
, force_region(force_region_)
, remote_host_filter(remote_host_filter_)
Expand All @@ -107,6 +110,8 @@ PocoHTTPClientConfiguration::PocoHTTPClientConfiguration(
, s3_use_adaptive_timeouts(s3_use_adaptive_timeouts_)
, error_report(error_report_)
{
/// This is used to identify configurations created by us.
userAgent = std::string(VERSION_FULL) + VERSION_OFFICIAL;
}

void PocoHTTPClientConfiguration::updateSchemeAndRegion()
Expand Down Expand Up @@ -166,6 +171,17 @@ PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & client_config
{
}

PocoHTTPClient::PocoHTTPClient(const Aws::Client::ClientConfiguration & client_configuration)
: timeouts(ConnectionTimeouts()
.withConnectionTimeout(Poco::Timespan(client_configuration.connectTimeoutMs * 1000))
.withSendTimeout(Poco::Timespan(client_configuration.requestTimeoutMs * 1000))
.withReceiveTimeout(Poco::Timespan(client_configuration.requestTimeoutMs * 1000))
.withTCPKeepAliveTimeout(Poco::Timespan(
client_configuration.enableTcpKeepAlive ? client_configuration.tcpKeepAliveIntervalMs * 1000 : 0))),
remote_host_filter(Context::getGlobalContextInstance()->getRemoteHostFilter())
{
}

std::shared_ptr<Aws::Http::HttpResponse> PocoHTTPClient::MakeRequest(
const std::shared_ptr<Aws::Http::HttpRequest> & request,
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter,
Expand Down Expand Up @@ -381,8 +397,11 @@ void PocoHTTPClient::makeRequestInternalImpl(

try
{
const auto proxy_configuration = per_request_configuration();
for (unsigned int attempt = 0; attempt <= s3_max_redirects; ++attempt)
ProxyConfiguration proxy_configuration;
if (per_request_configuration)
proxy_configuration = per_request_configuration();

for (size_t attempt = 0; attempt <= s3_max_redirects; ++attempt)
{
Poco::URI target_uri(uri);

Expand Down Expand Up @@ -500,7 +519,6 @@ void PocoHTTPClient::makeRequestInternalImpl(
LOG_TEST(log, "Redirecting request to new location: {}", location);

addMetric(request, S3MetricType::Redirects);

continue;
}

Expand Down Expand Up @@ -548,9 +566,9 @@ void PocoHTTPClient::makeRequestInternalImpl(
}
else
{

if (status_code == 429 || status_code == 503)
{ // API throttling
{
/// API throttling
addMetric(request, S3MetricType::Throttling);
}
else if (status_code >= 300)
Expand Down
27 changes: 16 additions & 11 deletions src/IO/S3/PocoHTTPClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,28 @@
#include <aws/core/http/HttpRequest.h>
#include <aws/core/http/standard/StandardHttpResponse.h>


namespace Aws::Http::Standard
{
class StandardHttpResponse;
}

namespace DB
{

class Context;
}


namespace DB::S3
{

class ClientFactory;
class PocoHTTPClient;


struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
{
std::function<DB::ProxyConfiguration()> per_request_configuration;
std::function<ProxyConfiguration()> per_request_configuration;
String force_region;
const RemoteHostFilter & remote_host_filter;
unsigned int s3_max_redirects;
Expand All @@ -54,13 +57,13 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
size_t http_keep_alive_timeout = DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT;
size_t http_keep_alive_max_requests = DEFAULT_HTTP_KEEP_ALIVE_MAX_REQUEST;

std::function<void(const DB::ProxyConfiguration &)> error_report;
std::function<void(const ProxyConfiguration &)> error_report;

void updateSchemeAndRegion();

private:
PocoHTTPClientConfiguration(
std::function<DB::ProxyConfiguration()> per_request_configuration_,
std::function<ProxyConfiguration()> per_request_configuration_,
const String & force_region_,
const RemoteHostFilter & remote_host_filter_,
unsigned int s3_max_redirects_,
Expand All @@ -70,13 +73,13 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
bool s3_use_adaptive_timeouts_,
const ThrottlerPtr & get_request_throttler_,
const ThrottlerPtr & put_request_throttler_,
std::function<void(const DB::ProxyConfiguration &)> error_report_
);
std::function<void(const ProxyConfiguration &)> error_report_);

/// Constructor of Aws::Client::ClientConfiguration must be called after AWS SDK initialization.
friend ClientFactory;
};


class PocoHTTPResponse : public Aws::Http::Standard::StandardHttpResponse
{
public:
Expand Down Expand Up @@ -116,10 +119,12 @@ class PocoHTTPResponse : public Aws::Http::Standard::StandardHttpResponse
Aws::Utils::Stream::ResponseStream body_stream;
};


class PocoHTTPClient : public Aws::Http::HttpClient
{
public:
explicit PocoHTTPClient(const PocoHTTPClientConfiguration & client_configuration);
explicit PocoHTTPClient(const Aws::Client::ClientConfiguration & client_configuration);
~PocoHTTPClient() override = default;

std::shared_ptr<Aws::Http::HttpResponse> MakeRequest(
Expand Down Expand Up @@ -166,14 +171,14 @@ class PocoHTTPClient : public Aws::Http::HttpClient
static S3MetricKind getMetricKind(const Aws::Http::HttpRequest & request);
void addMetric(const Aws::Http::HttpRequest & request, S3MetricType type, ProfileEvents::Count amount = 1) const;

std::function<DB::ProxyConfiguration()> per_request_configuration;
std::function<void(const DB::ProxyConfiguration &)> error_report;
std::function<ProxyConfiguration()> per_request_configuration;
std::function<void(const ProxyConfiguration &)> error_report;
ConnectionTimeouts timeouts;
const RemoteHostFilter & remote_host_filter;
unsigned int s3_max_redirects;
unsigned int s3_max_redirects = 0;
bool s3_use_adaptive_timeouts = true;
bool enable_s3_requests_logging;
bool for_disk_s3;
bool enable_s3_requests_logging = false;
bool for_disk_s3 = false;

/// Limits get request per second rate for GET, SELECT and all other requests, excluding throttled by put throttler
/// (i.e. throttles GetObject, HeadObject)
Expand Down
5 changes: 4 additions & 1 deletion src/IO/S3/PocoHTTPClientFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ namespace DB::S3
std::shared_ptr<Aws::Http::HttpClient>
PocoHTTPClientFactory::CreateHttpClient(const Aws::Client::ClientConfiguration & client_configuration) const
{
return std::make_shared<PocoHTTPClient>(static_cast<const PocoHTTPClientConfiguration &>(client_configuration));
if (client_configuration.userAgent.starts_with("ClickHouse"))
return std::make_shared<PocoHTTPClient>(static_cast<const PocoHTTPClientConfiguration &>(client_configuration));
else /// This client is created inside the AWS SDK with default settings to obtain ECS credentials from localhost.
return std::make_shared<PocoHTTPClient>(client_configuration);
}

std::shared_ptr<Aws::Http::HttpRequest> PocoHTTPClientFactory::CreateHttpRequest(
Expand Down
4 changes: 4 additions & 0 deletions tests/queries/0_stateless/03170_ecs_crash.reference
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
1 2 3
4 5 6
7 8 9
0 0 0
9 changes: 9 additions & 0 deletions tests/queries/0_stateless/03170_ecs_crash.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/env bash
# Tags: no-fasttest

CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh

# Previous versions crashed in attempt to use this authentication method (regardless of whether it was able to authenticate):
AWS_CONTAINER_CREDENTIALS_FULL_URI=http://localhost:1338/latest/meta-data/container/security-credentials $CLICKHOUSE_LOCAL -q "select * from s3('http://localhost:11111/test/a.tsv')"
Loading