新建maven项目
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| <dependencies> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-all</artifactId> <version>1.5.0</version> </dependency> </dependencies> <build> <extensions> <extension> <groupId>kr.motd.maven</groupId> <artifactId>os-maven-plugin</artifactId> <version>1.4.1.Final</version> </extension> </extensions> <plugins> <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>0.5.0</version> <configuration> <protocArtifact>com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier} </protocArtifact> <pluginId>grpc-java</pluginId> <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.5.0:exe:${os.detected.classifier} </pluginArtifact> <protoSourceRoot>src/main/resources</protoSourceRoot> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>compile-custom</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
|
定义服务
新建文件 src/main/resources/test.proto
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| syntax = "proto3";
option java_multiple_files = true; option java_package = "com.acupt.grpc.proto"; option java_outer_classname = "HelloProto";
package com.acupt.grpc;
service HelloService { rpc hello (InvokeRequest) returns (InvokeResponse) { } }
message InvokeRequest { string name = 1; }
message InvokeResponse { string msg = 1; }
|
构建
使用maven插件根据.proto文件生成Java代码,插件已在pom.xml中配置,只需执行命令:
mvn install
构建完成后可以在target中找到生成的Java代码,用这些代码可以实现gRPC远程调用。
但在项目中还无法直接引用上面的类,右键 -> Mark Directory as -> Generated Sources Root
现在就可以在项目中引用了
代码
3个类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package com.acupt.grpc;
import com.acupt.grpc.proto.HelloServiceGrpc; import com.acupt.grpc.proto.InvokeRequest; import com.acupt.grpc.proto.InvokeResponse; import io.grpc.stub.StreamObserver;
public class HelloService extends HelloServiceGrpc.HelloServiceImplBase {
@Override public void hello(InvokeRequest request, StreamObserver<InvokeResponse> responseObserver) { System.out.println("request -> " + request); String name = request.getName(); InvokeResponse response = InvokeResponse.newBuilder() .setMsg("hello," + name) .build(); responseObserver.onNext(response); responseObserver.onCompleted(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| package com.acupt.grpc;
import io.grpc.Server; import io.grpc.ServerBuilder;
import java.io.IOException;
public class MyServer { public static void main(String[] args) throws IOException, InterruptedException { int port = 50051; Server server = ServerBuilder.forPort(port) .addService(new HelloService()) .build() .start(); System.out.println("started"); Thread.sleep(1000 * 60 * 2); server.shutdown(); System.out.println("shutdown"); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| package com.acupt.grpc;
import com.acupt.grpc.proto.HelloServiceGrpc; import com.acupt.grpc.proto.InvokeRequest; import com.acupt.grpc.proto.InvokeResponse; import io.grpc.Channel; import io.grpc.ManagedChannelBuilder;
public class MyClient {
public static void main(String[] args) { InvokeRequest request = InvokeRequest.newBuilder().setName("tom").build(); Channel channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext(true).build(); HelloServiceGrpc.HelloServiceBlockingStub blockingStub = HelloServiceGrpc.newBlockingStub(channel); InvokeResponse response = blockingStub.hello(request); System.out.println(response.getMsg()); } }
|
先启动MyServer,成功启动后再启动MyClient
请求方式
1.阻塞
同步请求,接口返回前是阻塞的。
1 2 3
| HelloServiceGrpc.HelloServiceBlockingStub blockingStub = HelloServiceGrpc.newBlockingStub(channel); InvokeResponse response = blockingStub.hello(request); System.out.println(response.getMsg());
|
2.Future
调用后返回guava包里继承了Future的接口ListenableFuture(增加了listener支持),可以控制超时时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| HelloServiceGrpc.HelloServiceFutureStub futureStub = HelloServiceGrpc.newFutureStub(channel); ListenableFuture<InvokeResponse> future = futureStub.hello(request); future.addListener( () -> System.out.println("listener 1"), command -> { System.out.println("execute 1 " + command); command.run(); }); future.addListener( () -> System.out.println("listener 2"), command -> { System.out.println("execute 2 " + command); command.run(); });
System.out.println(future.get(10, TimeUnit.SECONDS));
|
3.回调
调用接口传入回调函数,调用后马上返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| MyClient.done = false; HelloServiceGrpc.HelloServiceStub stub = HelloServiceGrpc.newStub(channel); stub.hello(request, new StreamObserver<InvokeResponse>() { @Override public void onNext(InvokeResponse value) { System.out.println("onNext " + value); }
@Override public void onError(Throwable t) { System.out.println("onError " + t.getMessage()); t.printStackTrace(); MyClient.done = true; }
@Override public void onCompleted() { System.out.println("onCompleted"); MyClient.done = true; } }); while (!MyClient.done) { Thread.sleep(1000); }
|