如何在Debian 9上备份,导入和迁移Apache Kafka数据

备份Apache Kafka数据是一项重要的实践,可帮助您从因意外用户错误而导致的意外数据丢失或添加到群集的错误数据中恢复。在本教程中,您将在单个Debian 9安装以及单独服务器上的多个Debian 9安装上备份,导入和迁移Kafka数据。 ZooKeeper是Kafka操作的关键组件,您还将在本教程中备份ZooKeeper的数据。

作者选择Tech Education Fund作为Write for DOnations计划的一部分进行捐赠。

介绍

备份Apache Kafka数据是一项重要的实践,可帮助您从因意外用户错误而导致的意外数据丢失或添加到群集的错误数据中恢复。 群集和主题数据的数据转储是执行备份和还原的有效方法。

将Kafka实例因服务器硬件或网络故障而无法使用而需要使用旧数据创建新的Kafka实例时,将备份数据导入并迁移到单独的服务器会很有帮助。 当您将Kafka实例移动到升级或降级的服务器时,由于资源使用的更改,导入和迁移备份数据也很有用。

在本教程中,您将在单个Debian 9安装以及单独服务器上的多个Debian 9安装上备份,导入和迁移Kafka数据。 ZooKeeper是Kafka运营的重要组成部分。 它存储有关群集状态的信息,例如消费者数据,分区数据以及群集中其他代理的状态。 因此,您还将在本教程中备份ZooKeeper的数据。

先决条件

要继续,您将需要:

  • 一个Debian 9服务器,具有至少4GB的RAM和非root sudo用户,按照本教程设置
  • 安装了Apache Kafka的Debian 9服务器,作为备份源。 如果源服务器上尚未安装Kafka,请按照如何在Debian 9上安装Apache Kafka指南来设置Kafka安装。
  • OpenJDK 8安装在服务器上。 要安装此版本,请按照这些说明安装特定版本的OpenJDK。
  • 第7步的可选项 - 安装了Apache Kafka的另一台Debian 9服务器,作为备份的目标。 按照上一个先决条件中的文章链接在目标服务器上安装Kafka。 只有在将Kafka数据从一台服务器移动到另一台服务器时,才需要此先决条件。 如果要备份并将Kafka数据导入单个服务器,则可以跳过此先决条件。

第1步 - 创建测试主题并添加消息

Kafka 消息是Kafka中最基本的数据存储单元,是您将向Kafka发布和订阅的实体。 Kafka 主题就像是一组相关消息的容器。 订阅特定主题时,您将只收到发布到该特定主题的消息。 在本节中,您将登录要备份的服务器(源服务器)并添加Kafka主题和消息,以便为备份填充一些数据。

本教程假设您已在kafka用户( /home/kafka/kafka )的主目录中安装了Kafka。 如果您的安装位于不同的目录中,请使用Kafka安装的路径修改以下命令中的~/ kafka部分,并在本教程的其余部分中修改命令。

通过执行以下命令SSH到源服务器:

ssh sammy@source_server_ip

运行以下命令以kafka用户身份登录:

sudo -iu kafka

通过键入以下命令,使用Kafka安装的bin目录中的kafka-topics.sh shell实用程序文件创建名为BackupTopic的主题:

~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic BackupTopic

使用~/ kafka /bin/kafka-console-producer.sh shell实用程序脚本将字符串"Test Message 1"BackupTopic主题。

如果您想在此处添加其他消息,则可以立即执行此操作。

echo "Test Message 1" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic BackupTopic > /dev/null

~/ kafka /bin/kafka-console-producer.sh文件允许您直接从命令行发布消息。 通常,您将在程序中使用Kafka客户端库发布消息,但由于这涉及不同编程语言的不同设置,因此您可以使用shell脚本作为在测试期间或执行管理任务时发布消息的独立于语言的方式。 --topic标志指定要将消息发布到的主题。

接下来,通过运行以下命令验证kafka-console-producer.sh脚本是否已发布消息:

~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic BackupTopic --from-beginning

~/ kafka /bin/kafka-console-consumer.sh shell脚本启动使用者。 一旦启动,它将订阅您在上一个命令中的"Test Message 1"消息中发布的主题的消息。 命令中的--from-beginning标志允许使用在启动使用者之前发布的消息。 如果未启用标记,则仅显示在启动使用者之后发布的消息。 运行该命令时,您将在终端中看到以下输出:

Test Message 1

CTRL+C以停止使用者。

您已经创建了一些测试数据并验证它是否持久化。 现在,您可以在下一节中备份状态数据。

第2步 - 备份ZooKeeper状态数据

在备份实际的Kafka数据之前,您需要备份存储在ZooKeeper中的群集状态。

ZooKeeper将其数据存储在~/ kafka /config/zookeeper.properties配置文件中dataDir字段指定的目录中。 您需要读取此字段的值以确定要备份的目录。 默认情况下, dataDir指向/ tmp/zookeeper目录。 如果安装中的值不同,请在以下命令中将/ tmp/zookeeper替换为该值。

以下是~/ kafka /config/zookeeper.properties文件的示例输出:

〜/Kafka/配置/ zookeeper.properties
...
...
...
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
...
...
...

现在您已拥有目录的路径,您可以创建其内容的压缩归档文件。 压缩归档文件是常规归档文件的更好选择,以节省磁盘空间。 运行以下命令:

tar -czf /home/kafka/zookeeper-backup.tar.gz /tmp/zookeeper/*

命令的输出tar: Removing leading / from member names您可以放心地忽略。

-c-z标志告诉tar创建存档并将gzip压缩应用于存档。 -f标志指定输出压缩归档文件的名称,在本例中为zookeeper-backup.tar.gz

您可以在当前目录中运行ls ,以查看zookeeper-backup.tar.gz作为输出的一部分。

您现在已成功备份ZooKeeper数据。 在下一节中,您将备份实际的Kafka数据。

第3步 - 备份Kafka主题和消息

在本节中,您将把Kafka的数据目录备份到压缩的tar文件中,就像您在上一步中为ZooKeeper所做的那样。

Kafka将主题,消息和内部文件存储在log.dirs字段在~/ kafka /config/server.properties配置文件中指定的目录中。 您需要读取此字段的值以确定要备份的目录。 默认情况下,在当前安装中, log.dirs指向/tmp/kafka-logs目录。 如果安装中的值不同,请使用正确的值替换以下命令中的/tmp/kafka-logs

以下是~/ kafka /config/server.properties文件的示例输出:

〜/Kafka/配置/ server.properties
...
...
...
############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
...
...
...

首先,停止Kafka服务,以便在使用tar创建存档时, log.dirs目录中的数据处于一致状态。 为此,请键入exit返回到服务器的非root用户,然后运行以下命令:

sudo systemctl stop kafka

停止Kafka服务后,以您的kafka用户身份重新登录:

sudo -iu kafka

作为非root sudo用户,必须停止/启动Kafka和ZooKeeper服务,因为在Apache Kafka安装先决条件中,您将kafka用户限制为安全预防措施。 先决条件中的此步骤禁用kafka用户的sudo访问,这会导致命令无法执行。

现在,通过运行以下命令创建目录内容的压缩归档文件:

tar -czf /home/kafka/kafka-backup.tar.gz /tmp/kafka-logs/*

再次,您可以安全地忽略命令的输出( tar: Removing leading / from member names )。

您可以在当前目录中运行ls ,以查看kafka-backup.tar.gz作为输出的一部分。

您可以再次启动Kafka服务 - 如果您不想立即恢复数据 - 通过键入exit ,切换到非root sudo用户,然后运行:

sudo systemctl start kafka

kafka用户身份重新登录:

sudo -iu kafka

您已成功备份Kafka数据。 您现在可以继续下一部分,在那里您将恢复存储在ZooKeeper中的集群状态数据。

第4步 - 恢复ZooKeeper数据

在本节中,您将恢复Kafka在用户执行操作(如创建主题,添加/删除其他节点以及添加和使用消息)时在内部创建和管理的群集状态数据。 您将通过删除ZooKeeper数据目录并恢复zookeeper-backup.tar.gz文件的内容将数据还原到现有源安装。 如果要将数据还原到其他服务器,请参阅第7步。

您需要停止Kafka和ZooKeeper服务,以防止在恢复过程中接收无效数据的数据目录。

首先,通过键入exit ,切换到非root sudo用户,然后运行以下命令来停止Kafka服务:

sudo systemctl stop kafka

接下来,停止ZooKeeper服务:

sudo systemctl stop zookeeper

kafka用户身份重新登录:

sudo -iu kafka

然后,您可以使用以下命令安全地删除现有的群集数据目录:

rm -r /tmp/zookeeper/*

现在恢复在第2步中备份的数据:

tar -C /tmp/zookeeper -xzf /home/kafka/zookeeper-backup.tar.gz --strip-components 2

-C标志告诉tar在提取数据之前切换到目录/ tmp/zookeeper 您指定--strip 2标志以使tar/ tmp/zookeeper /本身中提取存档的内容,而不是在其中的另一个目录(例如/tmp/zookeeper/tmp/zookeeper/ )中。

您已成功还原群集状态数据。 现在,您可以在下一节中继续进行Kafka数据恢复过程。

第5步 - 恢复Kafka数据

在本节中,您将通过删除Kafka数据目录并还原压缩的归档文件,将备份的Kafka数据还原到现有的源安装(如果您已遵循可选的第7步,则还原目标服务器)。 这将允许您验证还原是否成功。

您可以使用以下命令安全地删除现有的Kafka数据目录:

rm -r /tmp/kafka-logs/*

现在您已删除了数据,您的Kafka安装类似于全新安装,其中不存在任何主题或消息。 要恢复备份数据,请运行以下命令解压缩文件:

tar -C /tmp/kafka-logs -xzf /home/kafka/kafka-backup.tar.gz --strip-components 2

-C标志告诉tar在提取数据之前切换到目录/tmp/kafka-logs 指定--strip 2标志以确保存档的内容在/tmp/kafka-logs/本身中提取,而不是在其中的另一个目录(例如/tmp/kafka-logs/kafka-logs/ )中提取。

现在您已成功提取数据,您可以通过键入exit再次启动Kafka和ZooKeeper服务,切换到非root sudo用户,然后执行:

sudo systemctl start kafka

使用以下命令启动ZooKeeper服务:

sudo systemctl start zookeeper

kafka用户身份重新登录:

sudo -iu kafka

您已恢复kafka数据,您可以继续在下一部分中验证恢复是否成功。

第6步 - 验证恢复

要测试Kafka数据的恢复,您将使用您在第1步中创建的主题的消息。

等待几分钟让Kafka启动,然后执行以下命令从BackupTopic读取消息:

~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic BackupTopic --from-beginning

如果您收到如下警告,则需要等待Kafka完全启动:

[2018-09-13 15:52:45,234] WARN [Consumer clientId=consumer-1, groupId=console-consumer-87747] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

再过几分钟重试上一个命令,或者运行sudo systemctl restart kafka作为非root sudo用户。 如果恢复中没有问题,您将看到以下输出:

Test Message 1

如果您没有看到此消息,则可以检查是否遗漏了上一节中的任何命令并执行它们。

现在您已经验证了已恢复的Kafka数据,这意味着您已在单个Kafka安装中成功备份和还原了数据。 您可以继续执行第7步,了解如何将群集和主题数据迁移到另一台服务器中的安装。

第7步 - 将备份迁移并恢复到另一个Kafka服务器(可选)

在本节中,您将备份的数据从源Kafka服务器迁移到目标Kafka服务器。 为此,您将首先使用scp命令将压缩的tar.gz文件下载到本地系统。 然后,您将再次使用scp将文件推送到目标服务器。 文件出现在目标服务器中后,您可以按照先前使用的步骤还原备份并验证迁移是否成功。

您正在本地下载备份文件,然后将它们上载到目标服务器,而不是直接从源服务器复制到目标服务器,因为目标服务器的/home/ sammy /.ssh/authorized_keys没有源服务器的SSH密钥/home/ sammy /.ssh/authorized_keys文件,无法连接到源服务器和从源服务器连接。 但是,本地计算机可以连接到两台服务器,从而为您设置从源服务器到目标服务器的SSH访问权限的额外步骤。

通过执行以下命令将zookeeper-backup.tar.gzkafka-backup.tar.gz文件下载到本地计算机:

scp sammy@source_server_ip:/home/kafka/zookeeper-backup.tar.gz .

你会看到类似于的输出:

zookeeper-backup.tar.gz                                                                                                  100%   68KB 128.0KB/s   00:00

现在运行以下命令将kafka-backup.tar.gz文件下载到本地计算机:

scp sammy@source_server_ip:/home/kafka/kafka-backup.tar.gz .

您将看到以下输出:

kafka-backup.tar.gz                                                                                                       100% 1031KB 488.3KB/s   00:02

在本地计算机的当前目录中运行ls ,您将看到两个文件:

kafka-backup.tar.gz zookeeper.tar.gz

运行以下命令将zookeeper-backup.tar.gz文件传输到目标服务器的/home/ kafka /

scp zookeeper-backup.tar.gz sammy@destination_server_ip:/home/sammy/zookeeper-backup.tar.gz

现在运行以下命令将kafka-backup.tar.gz文件传输到目标服务器的/home/ kafka /

scp kafka-backup.tar.gz sammy@destination_server_ip:/home/sammy/kafka-backup.tar.gz

您已成功将备份文件上载到目标服务器。 由于文件位于/home/ sammy /目录中,并且没有kafka用户访问的正确权限,因此您可以将文件移动到/home/ kafka /目录并更改其权限。

通过执行以下命令SSH到目标服务器:

ssh sammy@destination_server_ip

现在将zookeeper-backup.tar.gz移动到/home/ kafka /执行:

sudo mv zookeeper-backup.tar.gz /home/sammy/zookeeper-backup.tar.gz

同样,运行以下命令将kafka-backup.tar.gz复制到/home/ kafka /

sudo mv kafka-backup.tar.gz /home/kafka/kafka-backup.tar.gz

通过运行以下命令更改备份文件的所有者:

sudo chown kafka /home/kafka/zookeeper-backup.tar.gz /home/kafka/kafka-backup.tar.gz

以前的mvchown命令不会显示任何输出。

既然备份文件存在于目标服务器的正确目录中,请按照本教程的第4步到6中列出的命令来还原和验证目标服务器的数据。

结论

在本教程中,您从单独的服务器上的相同安装和安装中备份,导入和迁移了Kafka主题和消息。 如果您想了解有关Kafka中其他有用管理任务的更多信息,可以参考Kafka官方文档的操作部分。

要远程存储诸如zookeeper-backup.tar.gzkafka-backup.tar.gz等备份文件,您可以浏览DigitalOcean Spaces 如果Kafka是服务器上运行的唯一服务,您还可以探索其他备份方法,例如完整实例备份