Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions google/cloud/storage/async/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,7 @@ AsyncClient::StartAppendableObjectUpload(
.then([](auto f) -> StatusOr<std::pair<AsyncWriter, AsyncToken>> {
auto w = f.get();
if (!w) return std::move(w).status();
auto t = absl::holds_alternative<google::storage::v2::Object>(
(*w)->PersistedState())
? AsyncToken()
: storage_internal::MakeAsyncToken(w->get());
auto t = storage_internal::MakeAsyncToken(w->get());
return std::make_pair(AsyncWriter(*std::move(w)), std::move(t));
});
}
Expand All @@ -163,10 +160,7 @@ AsyncClient::ResumeAppendableObjectUpload(BucketName const& bucket_name,
.then([](auto f) -> StatusOr<std::pair<AsyncWriter, AsyncToken>> {
auto w = f.get();
if (!w) return std::move(w).status();
auto t = absl::holds_alternative<google::storage::v2::Object>(
(*w)->PersistedState())
? AsyncToken()
: storage_internal::MakeAsyncToken(w->get());
auto t = storage_internal::MakeAsyncToken(w->get());
return std::make_pair(AsyncWriter(*std::move(w)), std::move(t));
});
}
Expand Down
6 changes: 3 additions & 3 deletions google/cloud/storage/async/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ TEST(AsyncClient, StartAppendableObjectUpload1) {
EXPECT_TRUE(TextFormat::ParseFromString(kExpectedRequest, &expected));
EXPECT_THAT(p.request, IsProtoEqual(expected));
auto writer = std::make_unique<MockAsyncWriterConnection>();
EXPECT_CALL(*writer, PersistedState).WillOnce(Return(0));
EXPECT_CALL(*writer, PersistedState).Times(0);
EXPECT_CALL(*writer, Finalize).WillRepeatedly([] {
return make_ready_future(make_status_or(TestProtoObject()));
});
Expand Down Expand Up @@ -446,7 +446,7 @@ TEST(AsyncClient, StartAppendableObjectUpload2) {
EXPECT_TRUE(TextFormat::ParseFromString(kExpectedRequest, &expected));
EXPECT_THAT(p.request, IsProtoEqual(expected));
auto writer = std::make_unique<MockAsyncWriterConnection>();
EXPECT_CALL(*writer, PersistedState).WillOnce(Return(0));
EXPECT_CALL(*writer, PersistedState).Times(0);
EXPECT_CALL(*writer, Finalize).WillRepeatedly([] {
return make_ready_future(make_status_or(TestProtoObject()));
});
Expand Down Expand Up @@ -513,7 +513,7 @@ TEST(AsyncClient, ResumeAppendableObjectUpload1) {
AsyncWriter w;
AsyncToken t;
std::tie(w, t) = *std::move(wt);
EXPECT_FALSE(t.valid());
EXPECT_TRUE(t.valid());
EXPECT_THAT(w.PersistedState(), VariantWith<google::storage::v2::Object>(
IsProtoEqual(TestProtoObject())));
}
Expand Down
15 changes: 11 additions & 4 deletions google/cloud/storage/internal/async/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -384,10 +384,17 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) {
std::unique_ptr<storage_experimental::AsyncWriterConnection>> {
auto rpc = f.get();
if (!rpc) return std::move(rpc).status();
persisted_size = rpc->first_response.resource().size();
auto impl = std::make_unique<AsyncWriterConnectionImpl>(
current, request, std::move(rpc->stream), hash, persisted_size,
false);
std::unique_ptr<AsyncWriterConnectionImpl> impl;
if (rpc->first_response.has_resource()) {
impl = std::make_unique<AsyncWriterConnectionImpl>(
current, request, std::move(rpc->stream), hash,
rpc->first_response.resource(), false);
} else {
persisted_size = rpc->first_response.persisted_size();
impl = std::make_unique<AsyncWriterConnectionImpl>(
current, request, std::move(rpc->stream), hash, persisted_size,
false);
}
return MakeWriterConnectionResumed(std::move(fa), std::move(impl),
std::move(request), std::move(hash),
rpc->first_response, *current);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ std::unique_ptr<AsyncBidiWriteObjectStream> MakeSuccessfulAppendStream(
return sequencer.PushBack("Read(PersistedSize)")
.then([persisted_size](auto) {
auto response = google::storage::v2::BidiWriteObjectResponse{};
response.mutable_resource()->set_size(persisted_size);
response.set_persisted_size(persisted_size);
return absl::make_optional(std::move(response));
});
})
Expand Down
10 changes: 5 additions & 5 deletions google/cloud/storage/internal/async/writer_connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ AsyncWriterConnectionImpl::AsyncWriterConnectionImpl(
google::storage::v2::BidiWriteObjectRequest request,
std::unique_ptr<StreamingRpc> impl,
std::shared_ptr<storage::internal::HashFunction> hash_function,
google::storage::v2::Object metadata)
: AsyncWriterConnectionImpl(std::move(options), std::move(request),
std::move(impl), std::move(hash_function),
PersistedStateType(std::move(metadata)),
/*offset=*/0) {}
google::storage::v2::Object metadata, bool first_request)
: AsyncWriterConnectionImpl(
std::move(options), std::move(request), std::move(impl),
std::move(hash_function), PersistedStateType(metadata),
/*offset=*/metadata.size(), std::move(first_request)) {}

AsyncWriterConnectionImpl::AsyncWriterConnectionImpl(
google::cloud::internal::ImmutableOptions options,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class AsyncWriterConnectionImpl
google::storage::v2::BidiWriteObjectRequest request,
std::unique_ptr<StreamingRpc> impl,
std::shared_ptr<storage::internal::HashFunction> hash_function,
google::storage::v2::Object metadata);
google::storage::v2::Object metadata, bool first_request = true);
~AsyncWriterConnectionImpl() override;

void Cancel() override { return impl_->Cancel(); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,10 @@ class AsyncWriterConnectionResumedState
options_ = internal::MakeImmutableOptions(options);
auto state = impl_->PersistedState();
if (absl::holds_alternative<google::storage::v2::Object>(state)) {
SetFinalized(std::unique_lock<std::mutex>(mu_),
absl::get<google::storage::v2::Object>(std::move(state)));
cancelled_ = true;
resume_status_ = internal::CancelledError("upload already finalized",
GCP_ERROR_INFO());
return;
buffer_offset_ = absl::get<google::storage::v2::Object>(state).size();
} else {
buffer_offset_ = absl::get<std::int64_t>(state);
}
buffer_offset_ = absl::get<std::int64_t>(state);
}

void Cancel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ TEST(WriteConnectionResumed, FinalizedOnConstruction) {
auto mock = std::make_unique<MockAsyncWriterConnection>();
EXPECT_CALL(*mock, UploadId).WillRepeatedly(Return("test-upload-id"));
EXPECT_CALL(*mock, PersistedState).WillRepeatedly(Return(TestObject()));
EXPECT_CALL(*mock, Finalize).Times(0);
EXPECT_CALL(*mock, Finalize(_)).WillOnce([&](auto) {
return sequencer.PushBack("Finalize").then([](auto) {
return StatusOr<google::storage::v2::Object>(TestObject());
});
});

MockFactory mock_factory;
EXPECT_CALL(mock_factory, Call).Times(0);
Expand All @@ -115,6 +119,11 @@ TEST(WriteConnectionResumed, FinalizedOnConstruction) {
VariantWith<google::storage::v2::Object>(IsProtoEqual(TestObject())));

auto finalize = connection->Finalize({});

EXPECT_FALSE(finalize.is_ready());
auto next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Finalize");
next.first.set_value(true);
EXPECT_TRUE(finalize.is_ready());
EXPECT_THAT(finalize.get(), IsOkAndHolds(IsProtoEqual(TestObject())));
}
Expand Down
Loading