Zookeeper分布式队列实战
import org.apache.zookeeper.*;
public class DistributedQueue {
private ZooKeeper zk;
private String queuePath;
public DistributedQueue(String hostPort, String queuePath) throws Exception {
this.queuePath = queuePath;
zk = new ZooKeeper(hostPort, 30000, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 处理事件
}
});
if (zk.exists(queuePath, false) == null) {
zk.create(queuePath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
public void put(byte[] data) throws KeeperException, InterruptedException {
String newPath = zk.create(queuePath + "/item_", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println("Put: " + newPath);
}
public byte[] take() throws KeeperException, InterruptedException {
List<String> items = zk.getChildren(queuePath, true);
byte[] data = null;
if (items.size() == 0) {
synchronized (this) {
wait();
}
} else {
String minItem = Collections.min(items, new Comparator<String>() {
@Override
public int compare(String lhs, String rhs) {
return Integer.parseInt(lhs.substring(5)) - Integer.parseInt(rhs.substring(5));
}
});
String itemPath = queuePath + "/" + minItem;
data = zk.getData(itemPath, false, null);
zk.delete(itemPath, -1);
System.out.println("Take: " + new String(data));
}
return data;
}
public static void main(String[] args) {
try {
DistributedQueue queue = new DistributedQueue("localhost:2181", "/queue");
queue.put("Hello".getBytes());
queue.put("World".getBytes());
byte[] data = queue.take();
// 处理数据...
} catch (Exception e) {
e.printStackTrace();
}
}
}
这个简化版的示例展示了如何使用Zookeeper实现一个简单的分布式队列。它创建了一个DistributedQueue
类,其中包含了用于添加和删除队列项的方法。put
方法负责向队列中添加数据,take
方法负责从队列中取出数据。这个例子假设Zookeeper服务器运行在本地主机的2181端口,队列的根路径是/queue
。
评论已关闭