zookeeper为了防止,系统宕机或重启导致的数据丢失,会对数据进行定时持久化。有两种持久化方式:
1.为每次事务操作记录到日志文件,这样就可以通过执行这些日志文件来恢复数据。
2.为了加快ZooKeeper恢复的速度,ZooKeeper还提供了对树结构和session信息进行数据快照持久化的操作。
日志文件
日志文件记录zookeeper服务器上的每一次事务操作。
日志文件格式:log.ZXID,ZXID非常重要,它表示该文件起始的事务id。
数据快照
数据快照用来记录zookeeper服务器上某一时刻的全量内存数据内容,并写入指定磁盘文件中。
数据快照文件格式:snapshot.ZXID,ZXID非常重要,ZooKeeper会根据ZXID来确定数据恢复的起始点。
镜像文件主要存储zookeeper的树结构和session信息。
类图
FileTxnSnapLog
是操作数据持久化的核心类,底层通过TxnLog和SnapShot来分别操作日志文件和数据快照。
存储数据快照
public void save(DataTree dataTree,
ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)
throws IOException {
long lastZxid = dataTree.lastProcessedZxid;
LOG.info("Snapshotting: " + Long.toHexString(lastZxid));
File snapshot=new File(
snapDir, Util.makeSnapshotName(lastZxid));
snapLog.serialize(dataTree, sessionsWithTimeouts, snapshot);
}
日志文件操作
public boolean append(Request si) throws IOException {
return txnLog.append(si.hdr, si.txn);
}
public void commit() throws IOException {
txnLog.commit();
}
public void rollLog() throws IOException {
txnLog.rollLog();
}
数据恢复
public long restore(DataTree dt, Map<Long, Integer> sessions,
PlayBackListener listener) throws IOException {
snapLog.deserialize(dt, sessions);
FileTxnLog txnLog = new FileTxnLog(dataDir);
TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
long highestZxid = dt.lastProcessedZxid;
TxnHeader hdr;
while (true) {
hdr = itr.getHeader();
...if (hdr.getZxid() < highestZxid && highestZxid != 0) {
LOG.error(highestZxid + "(higestZxid) > "
+ hdr.getZxid() + "(next log) for type "
+ hdr.getType());
} else {
highestZxid = hdr.getZxid();
}
try {
processTransaction(hdr,dt,sessions, itr.getTxn());
} catch(KeeperException.NoNodeException e) {
throw new IOException("Failed to process transaction type: " +
hdr.getType() + " error: " + e.getMessage());
}
if (!itr.next())
break;
}
return highestZxid;
}
FileTxnLog
负责维护事务日志对外的接口,包括事务日志的写入和读取等。
写入事务日志
1.如果日志文件打开,使用该日志文件;如果没有,使用该事务的zxid做为后缀,创建新的日志文件。
2.如果当前日志文件剩余空间不足4kb,对日志文件扩容到64mb,使用0来填充。预分配的原因是提高io效率。
3.对事务的头和事务体序列号
4.生成checksum
5.写入文件流。
public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException
{
if (hdr != null) {
...
if (logStream==null) {
...
logFileWrite = new File(logDir, ("log." +
Long.toHexString(hdr.getZxid())));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
fhdr.serialize(oa, "fileheader");
// Make sure that the magic number is written before padding.
logStream.flush();
currentSize = fos.getChannel().position();
streamsToFlush.add(fos);
}
padFile(fos);
byte[] buf = Util.marshallTxnEntry(hdr, txn);
...
Checksum crc = makeChecksumAlgorithm();
crc.update(buf, 0, buf.length);
oa.writeLong(crc.getValue(), "txnEntryCRC");
Util.writeTxnBytes(oa, buf);
return true;
}
return false;
}
持久化本质是将内存中对象数据以二进制的方式存储到磁盘上,这个过程,底层通过jute来序列号。
序列化和反序列化的本质就是数据流与对象数据之间的变换。jute的序列化理念是让需要序列化的对象自己定义序列化协议。所以使用jute进行序列化的对象需要实现Record接口,该接口需要对象实现序列化和反序列化方法。此外jute还对序列化的流进行了抽象,OutputArchive代表输入流,InputArchive代表输入流,各种类型流的读写通过实现这两个接口实现。通过实现Record接口,对象定义序列化和反序列化的协议;通过实现OutputArchive和InputArchive,实现数据存储和读取。
Record代码:
1 public interface Record { 2 public void serialize(OutputArchive archive, String tag) 3 throws IOException; 4 public void deserialize(InputArchive archive, String tag) 5 throws IOException; 6 }