Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ public GrpcConfiguration getConfiguration() {
return configuration;
}

/**
* Returns the actual local port the gRPC server is listening on. Useful when the server was started with port 0
* (OS-assigned port).
*/
public int getLocalPort() {
return server != null ? server.getPort() : -1;
}

@Override
protected void doStart() throws Exception {
super.doStart();
Expand All @@ -94,7 +102,7 @@ protected void initializeServer() throws Exception {
BindableService bindableService = getBindableServiceFactory().createBindableService(this);
ServerInterceptor headerInterceptor = new GrpcHeaderInterceptor();

if (ObjectHelper.isNotEmpty(configuration.getHost()) && configuration.getPort() > 0) {
if (ObjectHelper.isNotEmpty(configuration.getHost()) && configuration.getPort() >= 0) {
LOG.debug("Building gRPC server on {}:{}", configuration.getHost(), configuration.getPort());
serverBuilder
= NettyServerBuilder.forAddress(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.AvailablePortFinder;
import org.apache.camel.test.junit6.CamelTestSupport;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,10 +40,6 @@ public class GrpcConsumerAggregationTest extends CamelTestSupport {

private static final Logger LOG = LoggerFactory.getLogger(GrpcConsumerAggregationTest.class);

@RegisterExtension
static AvailablePortFinder.Port grpcSyncRequestTestPort = AvailablePortFinder.find();
@RegisterExtension
static AvailablePortFinder.Port grpcAsyncRequestTestPort = AvailablePortFinder.find();
private static final int GRPC_TEST_PING_ID = 1;
private static final String GRPC_TEST_PING_VALUE = "PING";
private static final String GRPC_TEST_PONG_VALUE = "PONG";
Expand All @@ -58,10 +52,12 @@ public class GrpcConsumerAggregationTest extends CamelTestSupport {

@BeforeEach
public void startGrpcChannels() {
syncRequestChannel
= ManagedChannelBuilder.forAddress("localhost", grpcSyncRequestTestPort.getPort()).usePlaintext().build();
asyncRequestChannel
= ManagedChannelBuilder.forAddress("localhost", grpcAsyncRequestTestPort.getPort()).usePlaintext().build();
syncRequestChannel = ManagedChannelBuilder
.forAddress("localhost", ((GrpcConsumer) context.getRoute("grpc-sync").getConsumer()).getLocalPort())
.usePlaintext().build();
asyncRequestChannel = ManagedChannelBuilder
.forAddress("localhost", ((GrpcConsumer) context.getRoute("grpc-async").getConsumer()).getLocalPort())
.usePlaintext().build();
blockingStub = PingPongGrpc.newBlockingStub(syncRequestChannel);
nonBlockingStub = PingPongGrpc.newStub(syncRequestChannel);
asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel);
Expand Down Expand Up @@ -186,12 +182,12 @@ protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@Override
public void configure() {
from("grpc://localhost:" + grpcSyncRequestTestPort.getPort()
+ "/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION")
from("grpc://localhost:0/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION")
.routeId("grpc-sync")
.bean(new GrpcMessageBuilder(), "buildPongResponse");

from("grpc://localhost:" + grpcAsyncRequestTestPort.getPort()
+ "/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION")
from("grpc://localhost:0/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION")
.routeId("grpc-async")
.bean(new GrpcMessageBuilder(), "buildAsyncPongResponse");
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.AvailablePortFinder;
import org.apache.camel.test.junit6.CamelTestSupport;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,10 +41,6 @@
public class GrpcConsumerConcurrentTest extends CamelTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(GrpcConsumerConcurrentTest.class);

@RegisterExtension
static AvailablePortFinder.Port grpcAsyncRequestTestPort = AvailablePortFinder.find();
@RegisterExtension
static AvailablePortFinder.Port grpcHeadersTestPort = AvailablePortFinder.find();
private static final int CONCURRENT_THREAD_COUNT = 30;
private static final int ROUNDS_PER_THREAD_COUNT = 10;
private static final String GRPC_TEST_PING_VALUE = "PING";
Expand All @@ -64,13 +58,14 @@ public static Integer getId() {

@Test
public void testAsyncWithConcurrentThreads() {
int asyncPort = ((GrpcConsumer) context.getRoute("grpc-async").getConsumer()).getLocalPort();
RunnableAssert ra = new RunnableAssert("foo") {

@Override
public void run() {
final CountDownLatch latch = new CountDownLatch(1);
ManagedChannel asyncRequestChannel
= NettyChannelBuilder.forAddress("localhost", grpcAsyncRequestTestPort.getPort()).usePlaintext()
= NettyChannelBuilder.forAddress("localhost", asyncPort).usePlaintext()
.build();
PingPongGrpc.PingPongStub asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel);

Expand Down Expand Up @@ -105,13 +100,14 @@ public void run() {

@Test
public void testHeadersWithConcurrentThreads() {
int headersPort = ((GrpcConsumer) context.getRoute("grpc-headers").getConsumer()).getLocalPort();
RunnableAssert ra = new RunnableAssert("foo") {

@Override
public void run() {
int instanceId = createId();
final CountDownLatch latch = new CountDownLatch(1);
ManagedChannel asyncRequestChannel = NettyChannelBuilder.forAddress("localhost", grpcHeadersTestPort.getPort())
ManagedChannel asyncRequestChannel = NettyChannelBuilder.forAddress("localhost", headersPort)
.userAgent(GRPC_USER_AGENT_PREFIX + instanceId)
.usePlaintext().build();
PingPongGrpc.PingPongStub asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel);
Expand Down Expand Up @@ -149,12 +145,12 @@ protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@Override
public void configure() {
from("grpc://localhost:" + grpcAsyncRequestTestPort.getPort()
+ "/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION")
from("grpc://localhost:0/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION")
.routeId("grpc-async")
.bean(new GrpcMessageBuilder(), "buildAsyncPongResponse");

from("grpc://localhost:" + grpcHeadersTestPort.getPort()
+ "/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION")
from("grpc://localhost:0/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION")
.routeId("grpc-headers")
.process(new HeaderExchangeProcessor());
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@
import io.grpc.stub.StreamObserver;
import org.apache.camel.CamelException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.AvailablePortFinder;
import org.apache.camel.test.junit6.CamelTestSupport;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,8 +41,6 @@ public class GrpcConsumerExceptionTest extends CamelTestSupport {

private static final Logger LOG = LoggerFactory.getLogger(GrpcConsumerExceptionTest.class);

@RegisterExtension
static AvailablePortFinder.Port grpcSyncRequestTestPort = AvailablePortFinder.find();
private static final int GRPC_TEST_PING_ID = 1;
private static final String GRPC_TEST_PING_VALUE = "PING";

Expand All @@ -54,8 +50,8 @@ public class GrpcConsumerExceptionTest extends CamelTestSupport {

@BeforeEach
public void startGrpcChannels() {
syncRequestChannel
= ManagedChannelBuilder.forAddress("localhost", grpcSyncRequestTestPort.getPort()).usePlaintext().build();
int port = ((GrpcConsumer) context.getRoute("grpc-exception").getConsumer()).getLocalPort();
syncRequestChannel = ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build();
blockingStub = PingPongGrpc.newBlockingStub(syncRequestChannel);
nonBlockingStub = PingPongGrpc.newStub(syncRequestChannel);
}
Expand Down Expand Up @@ -96,8 +92,8 @@ protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@Override
public void configure() {
from("grpc://localhost:" + grpcSyncRequestTestPort.getPort()
+ "/org.apache.camel.component.grpc.PingPong?synchronous=true")
from("grpc://localhost:0/org.apache.camel.component.grpc.PingPong?synchronous=true")
.routeId("grpc-exception")
.throwException(CamelException.class, "GRPC Camel exception message");

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@
import io.grpc.stub.StreamObserver;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.AvailablePortFinder;
import org.apache.camel.test.junit6.CamelTestSupport;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,10 +39,6 @@ public class GrpcConsumerPropagationTest extends CamelTestSupport {

private static final Logger LOG = LoggerFactory.getLogger(GrpcConsumerPropagationTest.class);

@RegisterExtension
static AvailablePortFinder.Port grpcAsyncNextRequestTestPort = AvailablePortFinder.find();
@RegisterExtension
static AvailablePortFinder.Port grpcAsyncCompletedRequestTestPort = AvailablePortFinder.find();
private static final int GRPC_TEST_PING_ID = 1;
private static final String GRPC_TEST_PING_VALUE = "PING";
private static final String GRPC_TEST_PONG_VALUE = "PONG";
Expand All @@ -56,11 +50,12 @@ public class GrpcConsumerPropagationTest extends CamelTestSupport {

@BeforeEach
public void startGrpcChannels() {
asyncOnNextChannel
= ManagedChannelBuilder.forAddress("localhost", grpcAsyncNextRequestTestPort.getPort()).usePlaintext().build();
asyncOnCompletedChannel
= ManagedChannelBuilder.forAddress("localhost", grpcAsyncCompletedRequestTestPort.getPort()).usePlaintext()
.build();
asyncOnNextChannel = ManagedChannelBuilder
.forAddress("localhost", ((GrpcConsumer) context.getRoute("grpc-on-next").getConsumer()).getLocalPort())
.usePlaintext().build();
asyncOnCompletedChannel = ManagedChannelBuilder
.forAddress("localhost", ((GrpcConsumer) context.getRoute("grpc-on-completed").getConsumer()).getLocalPort())
.usePlaintext().build();
asyncOnNextStub = PingPongGrpc.newStub(asyncOnNextChannel);
asyncOnCompletedStub = PingPongGrpc.newStub(asyncOnCompletedChannel);
}
Expand Down Expand Up @@ -126,13 +121,13 @@ protected RouteBuilder createRouteBuilder() {
@Override
public void configure() {

from("grpc://localhost:" + grpcAsyncNextRequestTestPort.getPort()
+ "/org.apache.camel.component.grpc.PingPong?consumerStrategy=PROPAGATION")
from("grpc://localhost:0/org.apache.camel.component.grpc.PingPong?consumerStrategy=PROPAGATION")
.routeId("grpc-on-next")
.to("mock:async-on-next-propagation")
.bean(new GrpcMessageBuilder(), "buildAsyncPongResponse");

from("grpc://localhost:" + grpcAsyncCompletedRequestTestPort.getPort()
+ "/org.apache.camel.component.grpc.PingPong?consumerStrategy=PROPAGATION&forwardOnCompleted=true")
from("grpc://localhost:0/org.apache.camel.component.grpc.PingPong?consumerStrategy=PROPAGATION&forwardOnCompleted=true")
.routeId("grpc-on-completed")
.to("mock:async-on-completed-propagation");
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,11 @@
import org.apache.camel.component.grpc.auth.jwt.JwtCallCredentials;
import org.apache.camel.component.grpc.auth.jwt.JwtHelper;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.AvailablePortFinder;
import org.apache.camel.test.junit6.CamelTestSupport;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -52,12 +50,6 @@ public class GrpcConsumerSecurityTest extends CamelTestSupport {

private static final Logger LOG = LoggerFactory.getLogger(GrpcConsumerSecurityTest.class);

@RegisterExtension
static AvailablePortFinder.Port grpcTlsTestPort = AvailablePortFinder.find();
@RegisterExtension
static AvailablePortFinder.Port grpcJwtCorrectTestPort = AvailablePortFinder.find();
@RegisterExtension
static AvailablePortFinder.Port grpcJwtIncorrectTestPort = AvailablePortFinder.find();
private static final int GRPC_TEST_PING_ID = 1;
private static final String GRPC_TEST_PING_VALUE = "PING";
private static final String GRPC_TEST_PONG_VALUE = "PONG";
Expand Down Expand Up @@ -85,14 +77,16 @@ public void startGrpcChannels() throws SSLException {

Assumptions.assumeTrue(sslContext instanceof OpenSslClientContext || sslContext instanceof JdkSslContext);

tlsChannel = NettyChannelBuilder.forAddress("localhost", grpcTlsTestPort.getPort())
int tlsPort = ((GrpcConsumer) context.getRoute("grpc-tls").getConsumer()).getLocalPort();
int jwtCorrectPort = ((GrpcConsumer) context.getRoute("grpc-jwt-correct").getConsumer()).getLocalPort();
int jwtIncorrectPort = ((GrpcConsumer) context.getRoute("grpc-jwt-incorrect").getConsumer()).getLocalPort();

tlsChannel = NettyChannelBuilder.forAddress("localhost", tlsPort)
.sslContext(sslContext)
.build();

jwtCorrectChannel
= NettyChannelBuilder.forAddress("localhost", grpcJwtCorrectTestPort.getPort()).usePlaintext().build();
jwtIncorrectChannel
= NettyChannelBuilder.forAddress("localhost", grpcJwtIncorrectTestPort.getPort()).usePlaintext().build();
jwtCorrectChannel = NettyChannelBuilder.forAddress("localhost", jwtCorrectPort).usePlaintext().build();
jwtIncorrectChannel = NettyChannelBuilder.forAddress("localhost", jwtIncorrectPort).usePlaintext().build();

tlsAsyncStub = PingPongGrpc.newStub(tlsChannel);
jwtCorrectAsyncStub
Expand Down Expand Up @@ -193,22 +187,25 @@ protected RouteBuilder createRouteBuilder() {
@Override
public void configure() {

from("grpc://localhost:" + grpcTlsTestPort.getPort()
+ "/org.apache.camel.component.grpc.PingPong?consumerStrategy=PROPAGATION&"
from("grpc://localhost:0" +
"/org.apache.camel.component.grpc.PingPong?consumerStrategy=PROPAGATION&"
+ "negotiationType=TLS&keyCertChainResource=file:src/test/resources/certs/server.pem&"
+ "keyResource=file:src/test/resources/certs/server.key&trustCertCollectionResource=file:src/test/resources/certs/ca.pem")
.routeId("grpc-tls")
.to("mock:tls-enable")
.bean(new GrpcMessageBuilder(), "buildAsyncPongResponse");

from("grpc://localhost:" + grpcJwtCorrectTestPort.getPort()
+ "/org.apache.camel.component.grpc.PingPong?consumerStrategy=PROPAGATION&"
from("grpc://localhost:0" +
"/org.apache.camel.component.grpc.PingPong?consumerStrategy=PROPAGATION&"
+ "authenticationType=JWT&jwtSecret=" + GRPC_JWT_CORRECT_SECRET)
.routeId("grpc-jwt-correct")
.to("mock:jwt-correct-secret")
.bean(new GrpcMessageBuilder(), "buildAsyncPongResponse");

from("grpc://localhost:" + grpcJwtIncorrectTestPort.getPort()
+ "/org.apache.camel.component.grpc.PingPong?consumerStrategy=PROPAGATION&"
from("grpc://localhost:0" +
"/org.apache.camel.component.grpc.PingPong?consumerStrategy=PROPAGATION&"
+ "authenticationType=JWT&jwtSecret=" + GRPC_JWT_CORRECT_SECRET)
.routeId("grpc-jwt-incorrect")
.to("mock:jwt-incorrect-secret")
.bean(new GrpcMessageBuilder(), "buildAsyncPongResponse");
}
Expand Down
Loading
Loading