找回密码
 立即注册
首页 资源区 代码 物联网之使用Vertx实现UDP最佳实践【响应式】 ...

物联网之使用Vertx实现UDP最佳实践【响应式】

屠焘 3 天前
小伙伴们,你们好呀,我是老寇,跟我一起学习使用Vertx实现UDP-Server
实现UDP【响应式】

Vertx-Core地址
注意

UDP是无连接的传输,这意味着您与远程客户端没有建立持续的连接。
所以,您发送和接收的数据包都要包含有远程的地址。
除此之外,UDP不像TCP的使用那样安全, 这也就意味着不能保证发送的数据包一定会被对应的接收端(Endpoint)接收。【传输数据时不建立连接,因此可能导致数据包丢失
UDP最适合一些允许丢弃数据包的应用(如监视应用程序,视频直播)。
实现过程

查看源码
代码比较简单,懒得讲解啦

代码比较简单,懒得讲解啦

代码比较简单,懒得讲解啦

服务端

引入依赖
  1. <dependency>
  2.   <groupId>io.vertx</groupId>
  3.   vertx-core</artifactId>
  4.   <version>5.0.0</version>
  5. </dependency>
复制代码
UdpServerProperties
  1. /**
  2. * @author laokou
  3. */
  4. @Data
  5. @Component
  6. @ConfigurationProperties(prefix = "spring.udp-server")
  7. public class UdpServerProperties {
  8.     private String host = "0.0.0.0";
  9.     private Set<Integer> ports = new HashSet<>(0);
  10.     private boolean broadcast = false;
  11.     private boolean loopbackModeDisabled = true;
  12.     private String multicastNetworkInterface = null;
  13.     private boolean ipV6 = false;
  14. }
复制代码
VertxUdpServer
  1. /**
  2. * @author laokou
  3. */
  4. @Slf4j
  5. public final class VertxUdpServer extends AbstractVerticle {
  6.     private volatile Flux<DatagramSocket> datagramSocket;
  7.     private final UdpServerProperties udpServerProperties;
  8.     private boolean isClosed = false;
  9.     VertxUdpServer(Vertx vertx, UdpServerProperties udpServerProperties) {
  10.        this.udpServerProperties = udpServerProperties;
  11.        this.vertx = vertx;
  12.     }
  13.     @Override
  14.     public synchronized void start() {
  15.        datagramSocket = Flux.fromIterable(udpServerProperties.getPorts()).map(port -> {
  16.           DatagramSocket datagramSocket = vertx.createDatagramSocket(getDatagramSocketOption())
  17.              .handler(packet -> log.info("【Vertx-UDP-Server】 => 收到数据包:{}", packet.data()));
  18.           datagramSocket.listen(port, udpServerProperties.getHost()).onComplete(result -> {
  19.              if (isClosed) {
  20.                 return;
  21.              }
  22.              if (result.succeeded()) {
  23.                 log.info("【Vertx-UDP-Server】 => UDP服务启动成功,端口:{}", port);
  24.              }
  25.              else {
  26.                 Throwable ex = result.cause();
  27.                 log.error("【Vertx-UDP-Server】 => UDP服务启动失败,错误信息:{}", ex.getMessage(), ex);
  28.              }
  29.           });
  30.           return datagramSocket;
  31.        });
  32.        datagramSocket.subscribeOn(Schedulers.boundedElastic()).subscribe();
  33.     }
  34.     @Override
  35.     public synchronized void stop() {
  36.        isClosed = true;
  37.        datagramSocket.doOnNext(socket -> socket.close().onComplete(result -> {
  38.           if (result.succeeded()) {
  39.              log.info("【Vertx-UDP-Server】 => UDP服务停止成功");
  40.           }
  41.           else {
  42.              Throwable ex = result.cause();
  43.              log.error("【Vertx-UDP-Server】 => UDP服务停止失败,错误信息:{}", ex.getMessage(), ex);
  44.           }
  45.        })).subscribeOn(Schedulers.boundedElastic()).subscribe();
  46.     }
  47.     public void deploy() {
  48.        // 部署服务
  49.        vertx.deployVerticle(this);
  50.        // 停止服务
  51.        Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
  52.     }
  53.     private DatagramSocketOptions getDatagramSocketOption() {
  54.        DatagramSocketOptions datagramSocketOptions = new DatagramSocketOptions();
  55.        datagramSocketOptions.setBroadcast(udpServerProperties.isBroadcast());
  56.        datagramSocketOptions.setLoopbackModeDisabled(udpServerProperties.isLoopbackModeDisabled());
  57.        datagramSocketOptions.setMulticastNetworkInterface(udpServerProperties.getMulticastNetworkInterface());
  58.        datagramSocketOptions.setIpV6(udpServerProperties.isIpV6());
  59.        return datagramSocketOptions;
  60.     }
  61. }
复制代码
VertxUdpServerManager
  1. /**
  2. * @author laokou
  3. */
  4. public final class VertxUdpServerManager {
  5.     private VertxUdpServerManager() {
  6.     }
  7.     public static void deploy(final Vertx vertx, final UdpServerProperties properties) {
  8.        new VertxUdpServer(vertx, properties).deploy();
  9.     }
  10. }
复制代码
客户端【测试】
  1. /**
  2. * @author laokou
  3. */
  4. @Slf4j
  5. @SpringBootTest
  6. @RequiredArgsConstructor
  7. @TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
  8. class UdpTest {
  9.     private final Vertx vertx;
  10.     @Test
  11.     void test() throws InterruptedException {
  12.        for (int i = 4880; i < 5000; i++) {
  13.           DatagramSocket datagramSocket = vertx.createDatagramSocket();
  14.           int finalI = i;
  15.           datagramSocket.send("Hello Vertx", i, "127.0.0.1").onComplete(result -> {
  16.              if (result.succeeded()) {
  17.                 log.info("【Vertx-UDP-Client】 => 发送成功,端口:{}", finalI);
  18.              }
  19.              else {
  20.                 Throwable ex = result.cause();
  21.                 log.error("【Vertx-UDP-Client】 => 发送失败,端口:{},错误信息:{}", finalI, ex.getMessage(), ex);
  22.              }
  23.           });
  24.           Thread.sleep(2000);
  25.           Assertions.assertDoesNotThrow(datagramSocket::close);
  26.        }
  27.     }
  28. }
复制代码
这可以满足基本的协议开发,自行修改即可!!!
我是老寇,我们下次再见啦!

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册