小伙伴们,你们好呀,我是老寇,跟我一起学习使用Vertx实现UDP-Server
实现UDP【响应式】
Vertx-Core地址
注意
UDP是无连接的传输,这意味着您与远程客户端没有建立持续的连接。
所以,您发送和接收的数据包都要包含有远程的地址。
除此之外,UDP不像TCP的使用那样安全, 这也就意味着不能保证发送的数据包一定会被对应的接收端(Endpoint)接收。【传输数据时不建立连接,因此可能导致数据包丢失】
UDP最适合一些允许丢弃数据包的应用(如监视应用程序,视频直播)。
实现过程
查看源码
代码比较简单,懒得讲解啦
代码比较简单,懒得讲解啦
代码比较简单,懒得讲解啦
服务端
引入依赖
- <dependency>
- <groupId>io.vertx</groupId>
- vertx-core</artifactId>
- <version>5.0.0</version>
- </dependency>
复制代码 UdpServerProperties- /**
- * @author laokou
- */
- @Data
- @Component
- @ConfigurationProperties(prefix = "spring.udp-server")
- public class UdpServerProperties {
- private String host = "0.0.0.0";
- private Set<Integer> ports = new HashSet<>(0);
- private boolean broadcast = false;
- private boolean loopbackModeDisabled = true;
- private String multicastNetworkInterface = null;
- private boolean ipV6 = false;
- }
复制代码 VertxUdpServer- /**
- * @author laokou
- */
- @Slf4j
- public final class VertxUdpServer extends AbstractVerticle {
- private volatile Flux<DatagramSocket> datagramSocket;
- private final UdpServerProperties udpServerProperties;
- private boolean isClosed = false;
- VertxUdpServer(Vertx vertx, UdpServerProperties udpServerProperties) {
- this.udpServerProperties = udpServerProperties;
- this.vertx = vertx;
- }
- @Override
- public synchronized void start() {
- datagramSocket = Flux.fromIterable(udpServerProperties.getPorts()).map(port -> {
- DatagramSocket datagramSocket = vertx.createDatagramSocket(getDatagramSocketOption())
- .handler(packet -> log.info("【Vertx-UDP-Server】 => 收到数据包:{}", packet.data()));
- datagramSocket.listen(port, udpServerProperties.getHost()).onComplete(result -> {
- if (isClosed) {
- return;
- }
- if (result.succeeded()) {
- log.info("【Vertx-UDP-Server】 => UDP服务启动成功,端口:{}", port);
- }
- else {
- Throwable ex = result.cause();
- log.error("【Vertx-UDP-Server】 => UDP服务启动失败,错误信息:{}", ex.getMessage(), ex);
- }
- });
- return datagramSocket;
- });
- datagramSocket.subscribeOn(Schedulers.boundedElastic()).subscribe();
- }
- @Override
- public synchronized void stop() {
- isClosed = true;
- datagramSocket.doOnNext(socket -> socket.close().onComplete(result -> {
- if (result.succeeded()) {
- log.info("【Vertx-UDP-Server】 => UDP服务停止成功");
- }
- else {
- Throwable ex = result.cause();
- log.error("【Vertx-UDP-Server】 => UDP服务停止失败,错误信息:{}", ex.getMessage(), ex);
- }
- })).subscribeOn(Schedulers.boundedElastic()).subscribe();
- }
- public void deploy() {
- // 部署服务
- vertx.deployVerticle(this);
- // 停止服务
- Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
- }
- private DatagramSocketOptions getDatagramSocketOption() {
- DatagramSocketOptions datagramSocketOptions = new DatagramSocketOptions();
- datagramSocketOptions.setBroadcast(udpServerProperties.isBroadcast());
- datagramSocketOptions.setLoopbackModeDisabled(udpServerProperties.isLoopbackModeDisabled());
- datagramSocketOptions.setMulticastNetworkInterface(udpServerProperties.getMulticastNetworkInterface());
- datagramSocketOptions.setIpV6(udpServerProperties.isIpV6());
- return datagramSocketOptions;
- }
- }
复制代码 VertxUdpServerManager- /**
- * @author laokou
- */
- public final class VertxUdpServerManager {
- private VertxUdpServerManager() {
- }
- public static void deploy(final Vertx vertx, final UdpServerProperties properties) {
- new VertxUdpServer(vertx, properties).deploy();
- }
- }
复制代码 客户端【测试】
- /**
- * @author laokou
- */
- @Slf4j
- @SpringBootTest
- @RequiredArgsConstructor
- @TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
- class UdpTest {
- private final Vertx vertx;
- @Test
- void test() throws InterruptedException {
- for (int i = 4880; i < 5000; i++) {
- DatagramSocket datagramSocket = vertx.createDatagramSocket();
- int finalI = i;
- datagramSocket.send("Hello Vertx", i, "127.0.0.1").onComplete(result -> {
- if (result.succeeded()) {
- log.info("【Vertx-UDP-Client】 => 发送成功,端口:{}", finalI);
- }
- else {
- Throwable ex = result.cause();
- log.error("【Vertx-UDP-Client】 => 发送失败,端口:{},错误信息:{}", finalI, ex.getMessage(), ex);
- }
- });
- Thread.sleep(2000);
- Assertions.assertDoesNotThrow(datagramSocket::close);
- }
- }
- }
复制代码 这可以满足基本的协议开发,自行修改即可!!!
我是老寇,我们下次再见啦!
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |