找回密码
 立即注册
首页 资源区 代码 etcd 入门实战(3)-java 操作 etcd

etcd 入门实战(3)-java 操作 etcd

铵滔 4 天前
本文主要介绍使用 coreos 提供的 Java 客户端(jetcd)来操作 etcd,文中所使用到的软件版本:etcd 3.5.18、jetcd 0.7.7。
1、引入依赖
  1. <dependency>
  2.     <groupId>io.etcd</groupId>
  3.     jetcd-core</artifactId>
  4.     <version>0.7.7</version>
  5. </dependency>
复制代码
2、jetcd 使用

2.1、初始化客户端
  1. @Before
  2. public void before() {
  3.     client = Client.builder()
  4.             .endpoints("http://10.49.196.33:2379")
  5.             //.endpoints("http://10.49.196.30:2379", "http://10.49.196.31:2379", "http://10.49.196.33:2379")
  6.             .connectTimeout(Duration.of(10, ChronoUnit.SECONDS))
  7.             .build();
  8. }
复制代码
2.2、键值操作

A、新增/修改
  1. @Test
  2. public void kvPut() throws Exception {
  3.     KV kv = client.getKVClient();
  4.     ByteSequence key = ByteSequence.from("key2", StandardCharsets.UTF_8);
  5.     ByteSequence value = ByteSequence.from("value2", StandardCharsets.UTF_8);
  6.     CompletableFuture<PutResponse> completableFuture = kv.put(key, value);
  7.     log.info("completableFuture={}", completableFuture.get());
  8. }
复制代码
B、查询
  1. @Test
  2. public void kvGet() throws Exception {
  3.     KV kv = client.getKVClient();
  4.     ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8);
  5.     CompletableFuture<GetResponse> completableFuture = kv.get(key);
  6.     GetResponse getResponse = completableFuture.get();
  7.     if (getResponse.getCount() > 0) {
  8.         log.info("value={}", getResponse.getKvs().get(0).getValue());
  9.     }
  10.     key = ByteSequence.from("key", StandardCharsets.UTF_8);
  11.     GetOption getOption = GetOption.builder().isPrefix(true).build();
  12.     completableFuture = kv.get(key, getOption);//查询健以”key“开头的数据
  13.     for (KeyValue keyValue : completableFuture.get().getKvs()) {
  14.         log.info("key={},value={}", keyValue.getKey(), keyValue.getValue());
  15.     }
  16. }
复制代码
C、删除
  1. @Test
  2. public void kvDelete() throws Exception {
  3.     KV kv = client.getKVClient();
  4.     ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8);
  5.     CompletableFuture<DeleteResponse> completableFuture = kv.delete(key);
  6.     log.info("completableFuture={}", completableFuture.get());
  7. }
复制代码
2.3、监控
  1. @Test
  2. public void watch() throws Exception {
  3.     Watch watch = client.getWatchClient();
  4.     watch.watch(ByteSequence.from("key1", StandardCharsets.UTF_8), new Watch.Listener() {
  5.         @Override
  6.         public void onNext(WatchResponse response) {
  7.             List<WatchEvent> events = response.getEvents();
  8.             for (WatchEvent watchEvent : events) {
  9.                 log.info("eventType={},value={}", watchEvent.getEventType(), watchEvent.getKeyValue().getValue());
  10.             }
  11.         }
  12.         @Override
  13.         public void onError(Throwable throwable) {
  14.             log.error("发生异常:{}", throwable.getMessage());
  15.         }
  16.         @Override
  17.         public void onCompleted() {
  18.             log.info("complete");
  19.         }
  20.     });
  21.     Thread.sleep(1000 * 60 * 5);
  22. }
复制代码
2.4、租约
  1. @Test
  2. public void lease() throws Exception {
  3.     Lease lease = client.getLeaseClient();
  4.     //创建租约
  5.     LeaseGrantResponse leaseGrantResponse = lease.grant(10).get();
  6.     long leaseId = leaseGrantResponse.getID();
  7.     //租约与键值数据绑定
  8.     ByteSequence key = ByteSequence.from("lease-key", StandardCharsets.UTF_8);
  9.     ByteSequence value = ByteSequence.from("lease-value", StandardCharsets.UTF_8);
  10.     PutOption putOption = PutOption.builder().withLeaseId(leaseId).build();
  11.     client.getKVClient().put(key, value, putOption).get();
  12.     Thread.sleep(1000);
  13.     //查看租约剩余时间
  14.     LeaseOption leaseOption = LeaseOption.builder().withAttachedKeys().build();
  15.     LeaseTimeToLiveResponse leaseTimeToLiveResponse = lease.timeToLive(leaseId, leaseOption).get();
  16.     log.info("leaseTimeToLiveResponse={}", leaseTimeToLiveResponse);
  17.     //使租约一直有效
  18.     lease.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
  19.         @Override
  20.         public void onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) {
  21.             log.info("Lease keep-alive response:{}", leaseGrantResponse.getTTL());
  22.         }
  23.         @Override
  24.         public void onError(Throwable throwable) {
  25.             log.info("发生异常:{}", throwable.getMessage());
  26.         }
  27.         @Override
  28.         public void onCompleted() {
  29.             log.info("Complete");
  30.         }
  31.     });
  32.     Thread.sleep(1000 * 30);
  33.     //撤销租约
  34.     LeaseRevokeResponse leaseRevokeResponse = lease.revoke(leaseId).get();
  35.     log.info("leaseRevokeResponse={}", leaseRevokeResponse);
  36. }
复制代码
2.5、锁
  1. @Test
  2. public void lock() throws Exception {
  3.     ByteSequence lockName = ByteSequence.from("my-lock", StandardCharsets.UTF_8);
  4.     for (int i = 1; i <= 3; i++) {
  5.         new Thread(() -> {
  6.             try {
  7.                 LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(10).get();
  8.                 long leaseId = leaseGrantResponse.getID();
  9.                 Lock lock = client.getLockClient();
  10.                 //阻塞获取锁
  11.                 LockResponse lockResponse = lock.lock(lockName, leaseId).get();
  12.                 ByteSequence lockKey = lockResponse.getKey();
  13.                 log.info("{} 获得锁 {}", Thread.currentThread().getName(), lockResponse.getKey());
  14.                 Thread.sleep(3000);
  15.                 //释放锁,租约撤销或到期也会释放锁
  16.                 lock.unlock(lockKey).get();
  17.             } catch (Exception e) {
  18.                 log.error("", e);
  19.             }
  20.         }).start();
  21.     }
  22.     Thread.sleep(1000 * 20);
  23. }
复制代码
2.6、选举
  1. @Test
  2. public void election() throws Exception {
  3.     ByteSequence electionName = ByteSequence.from("electionName", StandardCharsets.UTF_8);
  4.     ByteSequence proposal1 = ByteSequence.from("proposal1", StandardCharsets.UTF_8);
  5.     ByteSequence proposal2 = ByteSequence.from("proposal2", StandardCharsets.UTF_8);
  6.     ByteSequence proposal3 = ByteSequence.from("proposal3", StandardCharsets.UTF_8);
  7.     ByteSequence[] proposals = new ByteSequence[]{proposal1, proposal2, proposal3};
  8.     for (ByteSequence proposal : proposals) {
  9.         new Thread(() -> {
  10.             try {
  11.                 Election election = client.getElectionClient();
  12.                 //监听选举事件(可选)
  13.                 election.observe(electionName, new Election.Listener() {
  14.                     @Override
  15.                     public void onNext(LeaderResponse leaderResponse) {
  16.                         log.info("proposal={},key={},value={}", proposal, leaderResponse.getKv().getKey(), leaderResponse.getKv().getValue());
  17.                     }
  18.                     @Override
  19.                     public void onError(Throwable throwable) {
  20.                         log.error("发生异常:{}", throwable.getMessage());
  21.                     }
  22.                     @Override
  23.                     public void onCompleted() {
  24.                         log.info("complete");
  25.                     }
  26.                 });
  27.                 LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(5).get();
  28.                 long leaseId = leaseGrantResponse.getID();
  29.                 client.getLeaseClient().keepAlive(leaseId, null);
  30.                 //获得领导权限或租约到期退出等待
  31.                 CampaignResponse campaignResponse = election.campaign(electionName, leaseId, proposal).get();
  32.                 LeaderKey leaderKey = campaignResponse.getLeader();
  33.                 log.info("{},获得领导权,{}", proposal, leaderKey.getKey());
  34.                 //获取领导者,如果是租约到期则改行代码会抛出异常NoLeaderException
  35.                 LeaderResponse leaderResponse = election.leader(electionName).get();
  36.                 log.info("领导者:{}", leaderResponse.getKv().getValue());
  37.                 //TODO:业务处理
  38.                 Thread.sleep(1000 * 6);
  39.                 //释放领导权
  40.                 election.resign(leaderKey).get();
  41.                 client.getLeaseClient().revoke(leaseId);
  42.             } catch (Exception e) {
  43.                 log.error("", e);
  44.             }
  45.         }).start();
  46.     }
  47.     Thread.sleep(1000 * 30);
  48. }
复制代码
2.7、完整代码

1.gif
2.gif
  1. package com.abc.etcd;
  2. import io.etcd.jetcd.*;
  3. import io.etcd.jetcd.election.CampaignResponse;
  4. import io.etcd.jetcd.election.LeaderKey;
  5. import io.etcd.jetcd.election.LeaderResponse;
  6. import io.etcd.jetcd.kv.DeleteResponse;
  7. import io.etcd.jetcd.kv.GetResponse;
  8. import io.etcd.jetcd.kv.PutResponse;
  9. import io.etcd.jetcd.lease.LeaseGrantResponse;
  10. import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
  11. import io.etcd.jetcd.lease.LeaseRevokeResponse;
  12. import io.etcd.jetcd.lease.LeaseTimeToLiveResponse;
  13. import io.etcd.jetcd.lock.LockResponse;
  14. import io.etcd.jetcd.options.GetOption;
  15. import io.etcd.jetcd.options.LeaseOption;
  16. import io.etcd.jetcd.options.PutOption;
  17. import io.etcd.jetcd.watch.WatchEvent;
  18. import io.etcd.jetcd.watch.WatchResponse;
  19. import io.grpc.stub.StreamObserver;
  20. import lombok.extern.slf4j.Slf4j;
  21. import org.junit.After;
  22. import org.junit.Before;
  23. import org.junit.Test;
  24. import java.nio.charset.StandardCharsets;
  25. import java.time.Duration;
  26. import java.time.temporal.ChronoUnit;
  27. import java.util.List;
  28. import java.util.concurrent.CompletableFuture;
  29. @Slf4j
  30. public class EtcdCase {
  31.     private Client client;
  32.     @Before
  33.     public void before() {
  34.         client = Client.builder()
  35.                 .endpoints("http://10.49.196.33:2379")
  36.                 //.endpoints("http://10.49.196.30:2379", "http://10.49.196.31:2379", "http://10.49.196.33:2379")
  37.                 .connectTimeout(Duration.of(10, ChronoUnit.SECONDS))
  38.                 .build();
  39.     }
  40.     @After
  41.     public void after() {
  42.         client.close();
  43.     }
  44.     @Test
  45.     public void kvPut() throws Exception {
  46.         KV kv = client.getKVClient();
  47.         ByteSequence key = ByteSequence.from("key2", StandardCharsets.UTF_8);
  48.         ByteSequence value = ByteSequence.from("value2", StandardCharsets.UTF_8);
  49.         CompletableFuture<PutResponse> completableFuture = kv.put(key, value);
  50.         log.info("completableFuture={}", completableFuture.get());
  51.     }
  52.     @Test
  53.     public void kvGet() throws Exception {
  54.         KV kv = client.getKVClient();
  55.         ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8);
  56.         CompletableFuture<GetResponse> completableFuture = kv.get(key);
  57.         GetResponse getResponse = completableFuture.get();
  58.         if (getResponse.getCount() > 0) {
  59.             log.info("value={}", getResponse.getKvs().get(0).getValue());
  60.         }
  61.         key = ByteSequence.from("key", StandardCharsets.UTF_8);
  62.         GetOption getOption = GetOption.builder().isPrefix(true).build();
  63.         completableFuture = kv.get(key, getOption);//查询健以”key“开头的数据
  64.         for (KeyValue keyValue : completableFuture.get().getKvs()) {
  65.             log.info("key={},value={}", keyValue.getKey(), keyValue.getValue());
  66.         }
  67.     }
  68.     @Test
  69.     public void kvDelete() throws Exception {
  70.         KV kv = client.getKVClient();
  71.         ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8);
  72.         CompletableFuture<DeleteResponse> completableFuture = kv.delete(key);
  73.         log.info("completableFuture={}", completableFuture.get());
  74.     }
  75.     @Test
  76.     public void watch() throws Exception {
  77.         Watch watch = client.getWatchClient();
  78.         watch.watch(ByteSequence.from("key1", StandardCharsets.UTF_8), new Watch.Listener() {
  79.             @Override
  80.             public void onNext(WatchResponse response) {
  81.                 List<WatchEvent> events = response.getEvents();
  82.                 for (WatchEvent watchEvent : events) {
  83.                     log.info("eventType={},value={}", watchEvent.getEventType(), watchEvent.getKeyValue().getValue());
  84.                 }
  85.             }
  86.             @Override
  87.             public void onError(Throwable throwable) {
  88.                 log.error("发生异常:{}", throwable.getMessage());
  89.             }
  90.             @Override
  91.             public void onCompleted() {
  92.                 log.info("complete");
  93.             }
  94.         });
  95.         Thread.sleep(1000 * 60 * 5);
  96.     }
  97.     @Test
  98.     public void lease() throws Exception {
  99.         Lease lease = client.getLeaseClient();
  100.         //创建租约
  101.         LeaseGrantResponse leaseGrantResponse = lease.grant(10).get();
  102.         long leaseId = leaseGrantResponse.getID();
  103.         //租约与键值数据绑定
  104.         ByteSequence key = ByteSequence.from("lease-key", StandardCharsets.UTF_8);
  105.         ByteSequence value = ByteSequence.from("lease-value", StandardCharsets.UTF_8);
  106.         PutOption putOption = PutOption.builder().withLeaseId(leaseId).build();
  107.         client.getKVClient().put(key, value, putOption).get();
  108.         Thread.sleep(1000);
  109.         //查看租约剩余时间
  110.         LeaseOption leaseOption = LeaseOption.builder().withAttachedKeys().build();
  111.         LeaseTimeToLiveResponse leaseTimeToLiveResponse = lease.timeToLive(leaseId, leaseOption).get();
  112.         log.info("leaseTimeToLiveResponse={}", leaseTimeToLiveResponse);
  113.         //使租约一直有效
  114.         lease.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
  115.             @Override
  116.             public void onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) {
  117.                 log.info("Lease keep-alive response:{}", leaseGrantResponse.getTTL());
  118.             }
  119.             @Override
  120.             public void onError(Throwable throwable) {
  121.                 log.info("发生异常:{}", throwable.getMessage());
  122.             }
  123.             @Override
  124.             public void onCompleted() {
  125.                 log.info("Complete");
  126.             }
  127.         });
  128.         Thread.sleep(1000 * 30);
  129.         //撤销租约
  130.         LeaseRevokeResponse leaseRevokeResponse = lease.revoke(leaseId).get();
  131.         log.info("leaseRevokeResponse={}", leaseRevokeResponse);
  132.     }
  133.     @Test
  134.     public void lock() throws Exception {
  135.         ByteSequence lockName = ByteSequence.from("my-lock", StandardCharsets.UTF_8);
  136.         for (int i = 1; i <= 3; i++) {
  137.             new Thread(() -> {
  138.                 try {
  139.                     LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(10).get();
  140.                     long leaseId = leaseGrantResponse.getID();
  141.                     Lock lock = client.getLockClient();
  142.                     //阻塞获取锁
  143.                     LockResponse lockResponse = lock.lock(lockName, leaseId).get();
  144.                     ByteSequence lockKey = lockResponse.getKey();
  145.                     log.info("{} 获得锁 {}", Thread.currentThread().getName(), lockResponse.getKey());
  146.                     Thread.sleep(3000);
  147.                     //释放锁,租约撤销或到期也会释放锁
  148.                     lock.unlock(lockKey).get();
  149.                 } catch (Exception e) {
  150.                     log.error("", e);
  151.                 }
  152.             }).start();
  153.         }
  154.         Thread.sleep(1000 * 20);
  155.     }
  156.     @Test
  157.     public void election() throws Exception {
  158.         ByteSequence electionName = ByteSequence.from("electionName", StandardCharsets.UTF_8);
  159.         ByteSequence proposal1 = ByteSequence.from("proposal1", StandardCharsets.UTF_8);
  160.         ByteSequence proposal2 = ByteSequence.from("proposal2", StandardCharsets.UTF_8);
  161.         ByteSequence proposal3 = ByteSequence.from("proposal3", StandardCharsets.UTF_8);
  162.         ByteSequence[] proposals = new ByteSequence[]{proposal1, proposal2, proposal3};
  163.         for (ByteSequence proposal : proposals) {
  164.             new Thread(() -> {
  165.                 try {
  166.                     Election election = client.getElectionClient();
  167.                     //监听选举事件(可选)
  168.                     election.observe(electionName, new Election.Listener() {
  169.                         @Override
  170.                         public void onNext(LeaderResponse leaderResponse) {
  171.                             log.info("proposal={},key={},value={}", proposal, leaderResponse.getKv().getKey(), leaderResponse.getKv().getValue());
  172.                         }
  173.                         @Override
  174.                         public void onError(Throwable throwable) {
  175.                             log.error("发生异常:{}", throwable.getMessage());
  176.                         }
  177.                         @Override
  178.                         public void onCompleted() {
  179.                             log.info("complete");
  180.                         }
  181.                     });
  182.                     LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(5).get();
  183.                     long leaseId = leaseGrantResponse.getID();
  184.                     client.getLeaseClient().keepAlive(leaseId, null);
  185.                     //获得领导权限或租约到期退出等待
  186.                     CampaignResponse campaignResponse = election.campaign(electionName, leaseId, proposal).get();
  187.                     LeaderKey leaderKey = campaignResponse.getLeader();
  188.                     log.info("{},获得领导权,{}", proposal, leaderKey.getKey());
  189.                     //获取领导者,如果是租约到期则改行代码会抛出异常NoLeaderException
  190.                     LeaderResponse leaderResponse = election.leader(electionName).get();
  191.                     log.info("领导者:{}", leaderResponse.getKv().getValue());
  192.                     //TODO:业务处理
  193.                     Thread.sleep(1000 * 6);
  194.                     //释放领导权
  195.                     election.resign(leaderKey).get();
  196.                     client.getLeaseClient().revoke(leaseId);
  197.                 } catch (Exception e) {
  198.                     log.error("", e);
  199.                 }
  200.             }).start();
  201.         }
  202.         Thread.sleep(1000 * 30);
  203.     }
  204.    
  205. }
复制代码
EtcdCase.java 
参考:
https://github.com/etcd-io/jetcd。
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册