Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
import io.temporal.common.interceptors.WorkflowClientInterceptor;
import io.temporal.internal.WorkflowThreadMarker;
import io.temporal.internal.client.NexusStartWorkflowRequest;
import io.temporal.internal.client.RootWorkflowClientInvoker;
import io.temporal.internal.client.WorkerFactoryRegistry;
import io.temporal.internal.client.WorkflowClientInternal;
import io.temporal.internal.client.*;
import io.temporal.internal.client.external.GenericWorkflowClient;
import io.temporal.internal.client.external.GenericWorkflowClientImpl;
import io.temporal.internal.client.external.ManualActivityCompletionClientFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.temporal.common.interceptors.ScheduleClientCallsInterceptor;
import io.temporal.common.interceptors.ScheduleClientInterceptor;
import io.temporal.internal.WorkflowThreadMarker;
import io.temporal.internal.client.NamespaceInjectWorkflowServiceStubs;
import io.temporal.internal.client.RootScheduleClientInvoker;
import io.temporal.internal.client.external.GenericWorkflowClient;
import io.temporal.internal.client.external.GenericWorkflowClientImpl;
Expand Down Expand Up @@ -60,6 +61,8 @@ public static ScheduleClient newInstance(
}

ScheduleClientImpl(WorkflowServiceStubs workflowServiceStubs, ScheduleClientOptions options) {
workflowServiceStubs =
new NamespaceInjectWorkflowServiceStubs(workflowServiceStubs, options.getNamespace());
this.workflowServiceStubs = workflowServiceStubs;
this.options = options;
this.metricsScope =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* limitations under the License.
*/

package io.temporal.client;
package io.temporal.internal.client;

import io.grpc.ManagedChannel;
import io.grpc.Metadata;
Expand All @@ -34,8 +34,8 @@
import java.util.function.Supplier;
import javax.annotation.Nullable;

/** Inject the namespace into the gRPC header */
class NamespaceInjectWorkflowServiceStubs implements WorkflowServiceStubs {
/** Inject the namespace into the gRPC header, overriding the current namespace if already set. */
public class NamespaceInjectWorkflowServiceStubs implements WorkflowServiceStubs {
private static Metadata.Key<String> TEMPORAL_NAMESPACE_HEADER_KEY =
Metadata.Key.of("temporal-namespace", Metadata.ASCII_STRING_MARSHALLER);
private final Metadata metadata;
Expand All @@ -56,14 +56,14 @@ public WorkflowServiceStubsOptions getOptions() {
public WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub() {
return next.blockingStub()
.withInterceptors(
new GrpcMetadataProviderInterceptor(Collections.singleton(() -> metadata)));
new GrpcMetadataProviderInterceptor(Collections.singleton(() -> metadata), true));
}

@Override
public WorkflowServiceGrpc.WorkflowServiceFutureStub futureStub() {
return next.futureStub()
.withInterceptors(
new GrpcMetadataProviderInterceptor(Collections.singleton(() -> metadata)));
new GrpcMetadataProviderInterceptor(Collections.singleton(() -> metadata), true));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,17 @@

public class GrpcMetadataProviderInterceptor implements ClientInterceptor {
private final Collection<GrpcMetadataProvider> grpcMetadataProviders;
private final boolean override;

public GrpcMetadataProviderInterceptor(Collection<GrpcMetadataProvider> grpcMetadataProviders) {
this.grpcMetadataProviders = checkNotNull(grpcMetadataProviders, "grpcMetadataProviders");
this.override = false;
}

public GrpcMetadataProviderInterceptor(
Collection<GrpcMetadataProvider> grpcMetadataProviders, boolean override) {
this.grpcMetadataProviders = checkNotNull(grpcMetadataProviders, "grpcMetadataProviders");
this.override = override;
}

@Override
Expand All @@ -47,7 +55,20 @@ private final class HeaderAttachingClientCall<ReqT, RespT>

@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
grpcMetadataProviders.stream().map(GrpcMetadataProvider::getMetadata).forEach(headers::merge);
grpcMetadataProviders.stream()
.map(GrpcMetadataProvider::getMetadata)
.forEach(
m -> {
// If override is true, discard all existing headers with the same key
// before adding the new ones. Otherwise, merge will add the new value to the
// existing key.
if (override) {
for (String key : m.keys()) {
headers.discardAll(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
}
}
headers.merge(m);
});
super.start(responseListener, headers);
}
}
Expand Down
Loading