本文主要介绍使用 coreos 提供的 Java 客户端(jetcd)来操作 etcd,文中所使用到的软件版本:etcd 3.5.18、jetcd 0.7.7。
1、引入依赖
- <dependency>
- <groupId>io.etcd</groupId>
- jetcd-core</artifactId>
- <version>0.7.7</version>
- </dependency>
复制代码 2、jetcd 使用
2.1、初始化客户端
- @Before
- public void before() {
- client = Client.builder()
- .endpoints("http://10.49.196.33:2379")
- //.endpoints("http://10.49.196.30:2379", "http://10.49.196.31:2379", "http://10.49.196.33:2379")
- .connectTimeout(Duration.of(10, ChronoUnit.SECONDS))
- .build();
- }
复制代码 2.2、键值操作
A、新增/修改- @Test
- public void kvPut() throws Exception {
- KV kv = client.getKVClient();
- ByteSequence key = ByteSequence.from("key2", StandardCharsets.UTF_8);
- ByteSequence value = ByteSequence.from("value2", StandardCharsets.UTF_8);
- CompletableFuture<PutResponse> completableFuture = kv.put(key, value);
- log.info("completableFuture={}", completableFuture.get());
- }
复制代码 B、查询- @Test
- public void kvGet() throws Exception {
- KV kv = client.getKVClient();
- ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8);
- CompletableFuture<GetResponse> completableFuture = kv.get(key);
- GetResponse getResponse = completableFuture.get();
- if (getResponse.getCount() > 0) {
- log.info("value={}", getResponse.getKvs().get(0).getValue());
- }
- key = ByteSequence.from("key", StandardCharsets.UTF_8);
- GetOption getOption = GetOption.builder().isPrefix(true).build();
- completableFuture = kv.get(key, getOption);//查询健以”key“开头的数据
- for (KeyValue keyValue : completableFuture.get().getKvs()) {
- log.info("key={},value={}", keyValue.getKey(), keyValue.getValue());
- }
- }
复制代码 C、删除- @Test
- public void kvDelete() throws Exception {
- KV kv = client.getKVClient();
- ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8);
- CompletableFuture<DeleteResponse> completableFuture = kv.delete(key);
- log.info("completableFuture={}", completableFuture.get());
- }
复制代码 2.3、监控
- @Test
- public void watch() throws Exception {
- Watch watch = client.getWatchClient();
- watch.watch(ByteSequence.from("key1", StandardCharsets.UTF_8), new Watch.Listener() {
- @Override
- public void onNext(WatchResponse response) {
- List<WatchEvent> events = response.getEvents();
- for (WatchEvent watchEvent : events) {
- log.info("eventType={},value={}", watchEvent.getEventType(), watchEvent.getKeyValue().getValue());
- }
- }
- @Override
- public void onError(Throwable throwable) {
- log.error("发生异常:{}", throwable.getMessage());
- }
- @Override
- public void onCompleted() {
- log.info("complete");
- }
- });
- Thread.sleep(1000 * 60 * 5);
- }
复制代码 2.4、租约
- @Test
- public void lease() throws Exception {
- Lease lease = client.getLeaseClient();
- //创建租约
- LeaseGrantResponse leaseGrantResponse = lease.grant(10).get();
- long leaseId = leaseGrantResponse.getID();
- //租约与键值数据绑定
- ByteSequence key = ByteSequence.from("lease-key", StandardCharsets.UTF_8);
- ByteSequence value = ByteSequence.from("lease-value", StandardCharsets.UTF_8);
- PutOption putOption = PutOption.builder().withLeaseId(leaseId).build();
- client.getKVClient().put(key, value, putOption).get();
- Thread.sleep(1000);
- //查看租约剩余时间
- LeaseOption leaseOption = LeaseOption.builder().withAttachedKeys().build();
- LeaseTimeToLiveResponse leaseTimeToLiveResponse = lease.timeToLive(leaseId, leaseOption).get();
- log.info("leaseTimeToLiveResponse={}", leaseTimeToLiveResponse);
- //使租约一直有效
- lease.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
- @Override
- public void onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) {
- log.info("Lease keep-alive response:{}", leaseGrantResponse.getTTL());
- }
- @Override
- public void onError(Throwable throwable) {
- log.info("发生异常:{}", throwable.getMessage());
- }
- @Override
- public void onCompleted() {
- log.info("Complete");
- }
- });
- Thread.sleep(1000 * 30);
- //撤销租约
- LeaseRevokeResponse leaseRevokeResponse = lease.revoke(leaseId).get();
- log.info("leaseRevokeResponse={}", leaseRevokeResponse);
- }
复制代码 2.5、锁
- @Test
- public void lock() throws Exception {
- ByteSequence lockName = ByteSequence.from("my-lock", StandardCharsets.UTF_8);
- for (int i = 1; i <= 3; i++) {
- new Thread(() -> {
- try {
- LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(10).get();
- long leaseId = leaseGrantResponse.getID();
- Lock lock = client.getLockClient();
- //阻塞获取锁
- LockResponse lockResponse = lock.lock(lockName, leaseId).get();
- ByteSequence lockKey = lockResponse.getKey();
- log.info("{} 获得锁 {}", Thread.currentThread().getName(), lockResponse.getKey());
- Thread.sleep(3000);
- //释放锁,租约撤销或到期也会释放锁
- lock.unlock(lockKey).get();
- } catch (Exception e) {
- log.error("", e);
- }
- }).start();
- }
- Thread.sleep(1000 * 20);
- }
复制代码 2.6、选举
- @Test
- public void election() throws Exception {
- ByteSequence electionName = ByteSequence.from("electionName", StandardCharsets.UTF_8);
- ByteSequence proposal1 = ByteSequence.from("proposal1", StandardCharsets.UTF_8);
- ByteSequence proposal2 = ByteSequence.from("proposal2", StandardCharsets.UTF_8);
- ByteSequence proposal3 = ByteSequence.from("proposal3", StandardCharsets.UTF_8);
- ByteSequence[] proposals = new ByteSequence[]{proposal1, proposal2, proposal3};
- for (ByteSequence proposal : proposals) {
- new Thread(() -> {
- try {
- Election election = client.getElectionClient();
- //监听选举事件(可选)
- election.observe(electionName, new Election.Listener() {
- @Override
- public void onNext(LeaderResponse leaderResponse) {
- log.info("proposal={},key={},value={}", proposal, leaderResponse.getKv().getKey(), leaderResponse.getKv().getValue());
- }
- @Override
- public void onError(Throwable throwable) {
- log.error("发生异常:{}", throwable.getMessage());
- }
- @Override
- public void onCompleted() {
- log.info("complete");
- }
- });
- LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(5).get();
- long leaseId = leaseGrantResponse.getID();
- client.getLeaseClient().keepAlive(leaseId, null);
- //获得领导权限或租约到期退出等待
- CampaignResponse campaignResponse = election.campaign(electionName, leaseId, proposal).get();
- LeaderKey leaderKey = campaignResponse.getLeader();
- log.info("{},获得领导权,{}", proposal, leaderKey.getKey());
- //获取领导者,如果是租约到期则改行代码会抛出异常NoLeaderException
- LeaderResponse leaderResponse = election.leader(electionName).get();
- log.info("领导者:{}", leaderResponse.getKv().getValue());
- //TODO:业务处理
- Thread.sleep(1000 * 6);
- //释放领导权
- election.resign(leaderKey).get();
- client.getLeaseClient().revoke(leaseId);
- } catch (Exception e) {
- log.error("", e);
- }
- }).start();
- }
- Thread.sleep(1000 * 30);
- }
复制代码 2.7、完整代码
- package com.abc.etcd;
- import io.etcd.jetcd.*;
- import io.etcd.jetcd.election.CampaignResponse;
- import io.etcd.jetcd.election.LeaderKey;
- import io.etcd.jetcd.election.LeaderResponse;
- import io.etcd.jetcd.kv.DeleteResponse;
- import io.etcd.jetcd.kv.GetResponse;
- import io.etcd.jetcd.kv.PutResponse;
- import io.etcd.jetcd.lease.LeaseGrantResponse;
- import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
- import io.etcd.jetcd.lease.LeaseRevokeResponse;
- import io.etcd.jetcd.lease.LeaseTimeToLiveResponse;
- import io.etcd.jetcd.lock.LockResponse;
- import io.etcd.jetcd.options.GetOption;
- import io.etcd.jetcd.options.LeaseOption;
- import io.etcd.jetcd.options.PutOption;
- import io.etcd.jetcd.watch.WatchEvent;
- import io.etcd.jetcd.watch.WatchResponse;
- import io.grpc.stub.StreamObserver;
- import lombok.extern.slf4j.Slf4j;
- import org.junit.After;
- import org.junit.Before;
- import org.junit.Test;
- import java.nio.charset.StandardCharsets;
- import java.time.Duration;
- import java.time.temporal.ChronoUnit;
- import java.util.List;
- import java.util.concurrent.CompletableFuture;
- @Slf4j
- public class EtcdCase {
- private Client client;
- @Before
- public void before() {
- client = Client.builder()
- .endpoints("http://10.49.196.33:2379")
- //.endpoints("http://10.49.196.30:2379", "http://10.49.196.31:2379", "http://10.49.196.33:2379")
- .connectTimeout(Duration.of(10, ChronoUnit.SECONDS))
- .build();
- }
- @After
- public void after() {
- client.close();
- }
- @Test
- public void kvPut() throws Exception {
- KV kv = client.getKVClient();
- ByteSequence key = ByteSequence.from("key2", StandardCharsets.UTF_8);
- ByteSequence value = ByteSequence.from("value2", StandardCharsets.UTF_8);
- CompletableFuture<PutResponse> completableFuture = kv.put(key, value);
- log.info("completableFuture={}", completableFuture.get());
- }
- @Test
- public void kvGet() throws Exception {
- KV kv = client.getKVClient();
- ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8);
- CompletableFuture<GetResponse> completableFuture = kv.get(key);
- GetResponse getResponse = completableFuture.get();
- if (getResponse.getCount() > 0) {
- log.info("value={}", getResponse.getKvs().get(0).getValue());
- }
- key = ByteSequence.from("key", StandardCharsets.UTF_8);
- GetOption getOption = GetOption.builder().isPrefix(true).build();
- completableFuture = kv.get(key, getOption);//查询健以”key“开头的数据
- for (KeyValue keyValue : completableFuture.get().getKvs()) {
- log.info("key={},value={}", keyValue.getKey(), keyValue.getValue());
- }
- }
- @Test
- public void kvDelete() throws Exception {
- KV kv = client.getKVClient();
- ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8);
- CompletableFuture<DeleteResponse> completableFuture = kv.delete(key);
- log.info("completableFuture={}", completableFuture.get());
- }
- @Test
- public void watch() throws Exception {
- Watch watch = client.getWatchClient();
- watch.watch(ByteSequence.from("key1", StandardCharsets.UTF_8), new Watch.Listener() {
- @Override
- public void onNext(WatchResponse response) {
- List<WatchEvent> events = response.getEvents();
- for (WatchEvent watchEvent : events) {
- log.info("eventType={},value={}", watchEvent.getEventType(), watchEvent.getKeyValue().getValue());
- }
- }
- @Override
- public void onError(Throwable throwable) {
- log.error("发生异常:{}", throwable.getMessage());
- }
- @Override
- public void onCompleted() {
- log.info("complete");
- }
- });
- Thread.sleep(1000 * 60 * 5);
- }
- @Test
- public void lease() throws Exception {
- Lease lease = client.getLeaseClient();
- //创建租约
- LeaseGrantResponse leaseGrantResponse = lease.grant(10).get();
- long leaseId = leaseGrantResponse.getID();
- //租约与键值数据绑定
- ByteSequence key = ByteSequence.from("lease-key", StandardCharsets.UTF_8);
- ByteSequence value = ByteSequence.from("lease-value", StandardCharsets.UTF_8);
- PutOption putOption = PutOption.builder().withLeaseId(leaseId).build();
- client.getKVClient().put(key, value, putOption).get();
- Thread.sleep(1000);
- //查看租约剩余时间
- LeaseOption leaseOption = LeaseOption.builder().withAttachedKeys().build();
- LeaseTimeToLiveResponse leaseTimeToLiveResponse = lease.timeToLive(leaseId, leaseOption).get();
- log.info("leaseTimeToLiveResponse={}", leaseTimeToLiveResponse);
- //使租约一直有效
- lease.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
- @Override
- public void onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) {
- log.info("Lease keep-alive response:{}", leaseGrantResponse.getTTL());
- }
- @Override
- public void onError(Throwable throwable) {
- log.info("发生异常:{}", throwable.getMessage());
- }
- @Override
- public void onCompleted() {
- log.info("Complete");
- }
- });
- Thread.sleep(1000 * 30);
- //撤销租约
- LeaseRevokeResponse leaseRevokeResponse = lease.revoke(leaseId).get();
- log.info("leaseRevokeResponse={}", leaseRevokeResponse);
- }
- @Test
- public void lock() throws Exception {
- ByteSequence lockName = ByteSequence.from("my-lock", StandardCharsets.UTF_8);
- for (int i = 1; i <= 3; i++) {
- new Thread(() -> {
- try {
- LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(10).get();
- long leaseId = leaseGrantResponse.getID();
- Lock lock = client.getLockClient();
- //阻塞获取锁
- LockResponse lockResponse = lock.lock(lockName, leaseId).get();
- ByteSequence lockKey = lockResponse.getKey();
- log.info("{} 获得锁 {}", Thread.currentThread().getName(), lockResponse.getKey());
- Thread.sleep(3000);
- //释放锁,租约撤销或到期也会释放锁
- lock.unlock(lockKey).get();
- } catch (Exception e) {
- log.error("", e);
- }
- }).start();
- }
- Thread.sleep(1000 * 20);
- }
- @Test
- public void election() throws Exception {
- ByteSequence electionName = ByteSequence.from("electionName", StandardCharsets.UTF_8);
- ByteSequence proposal1 = ByteSequence.from("proposal1", StandardCharsets.UTF_8);
- ByteSequence proposal2 = ByteSequence.from("proposal2", StandardCharsets.UTF_8);
- ByteSequence proposal3 = ByteSequence.from("proposal3", StandardCharsets.UTF_8);
- ByteSequence[] proposals = new ByteSequence[]{proposal1, proposal2, proposal3};
- for (ByteSequence proposal : proposals) {
- new Thread(() -> {
- try {
- Election election = client.getElectionClient();
- //监听选举事件(可选)
- election.observe(electionName, new Election.Listener() {
- @Override
- public void onNext(LeaderResponse leaderResponse) {
- log.info("proposal={},key={},value={}", proposal, leaderResponse.getKv().getKey(), leaderResponse.getKv().getValue());
- }
- @Override
- public void onError(Throwable throwable) {
- log.error("发生异常:{}", throwable.getMessage());
- }
- @Override
- public void onCompleted() {
- log.info("complete");
- }
- });
- LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(5).get();
- long leaseId = leaseGrantResponse.getID();
- client.getLeaseClient().keepAlive(leaseId, null);
- //获得领导权限或租约到期退出等待
- CampaignResponse campaignResponse = election.campaign(electionName, leaseId, proposal).get();
- LeaderKey leaderKey = campaignResponse.getLeader();
- log.info("{},获得领导权,{}", proposal, leaderKey.getKey());
- //获取领导者,如果是租约到期则改行代码会抛出异常NoLeaderException
- LeaderResponse leaderResponse = election.leader(electionName).get();
- log.info("领导者:{}", leaderResponse.getKv().getValue());
- //TODO:业务处理
- Thread.sleep(1000 * 6);
- //释放领导权
- election.resign(leaderKey).get();
- client.getLeaseClient().revoke(leaseId);
- } catch (Exception e) {
- log.error("", e);
- }
- }).start();
- }
- Thread.sleep(1000 * 30);
- }
-
- }
复制代码 EtcdCase.java
参考:
https://github.com/etcd-io/jetcd。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |