如何在Ubuntu 16.04上使用Transporter将已转换的数据从MongoDB同步到Elasticsearch

占位符

介绍

Transporter是一个开源工具,用于跨不同数据存储区移动数据。 开发人员经常编写一次性脚本来执行任务,例如跨数据库移动数据,将数据从文件移动到数据库,反之亦然,但使用像Transporter这样的工具有几个优点。

在Transporter中,您可以构建管道 ,它定义从 (读取数据的位置)到接收 (数据写入位置)的数据流。 源和汇可以是SQL或NoSQL数据库,平面文件或其他资源。 Transporter使用可插拔扩展的适配器与这些资源进行通信,并且该项目默认包含用于常用数据库的多个适配器

除了移动数据之外,Transporter还允许您在使用变压器穿过管道时更改数据。 像适配器一样,默认情况下包含几个变压器 您也可以编写自己的变压器来自定义数据的修改。

在本教程中,我们将介绍使用Transporter的内置适配器和用JavaScript编写的自定义转换器将数据从MongoDB数据库移动和处理到Elasticsearch的示例。

先决条件

要学习本教程,您需要:

运输管道是用JavaScript编写的。 本教程不需要任何JavaScript知识或经验,但您可以在这些JavaScript教程中了解更多信息。

第1步 - 安装运输车

Transporter为大多数常见操作系统提供二进制文件。 Ubuntu的安装过程包括两个步骤:下载Linux二进制文件并使其可执行。

首先,从GitHub的Transporter最新版本页面获取最新版本的链接。 复制以-linux-amd64结尾的链接。 本教程使用v0.5.2,这是写作时最新的。

将二进制文件下载到您的主目录。

cd
wget https://github.com/compose/transporter/releases/download/v0.5.2/transporter-0.5.2-linux-amd64

将其移至/usr/local/bin或您的首选安装目录。

mv transporter-*-linux-amd64 /usr/local/bin/transporter

然后使其可执行,以便可以运行它。

chmod +x /usr/local/bin/transporter

您可以通过运行二进制文件来测试Transporter是否正确设置。

transporter

您将看到使用帮助输出和版本号:

USAGE
  transporter <command> [flags]

COMMANDS
  run       run pipeline loaded from a file
  . . .

VERSION
  0.5.2

为了使用Transporter将数据从MongoDB移动到Elasticsearch,我们需要两件事:我们要移动的MongoDB中的数据以及告诉Transporter如何移动它的管道。 下一步创建一些示例数据,但如果您已经有了一个想要移动的MongoDB数据库,则可以跳过下一步并直接进入第3步。

第2步 - 将示例数据添加到MongoDB(可选)

在这一步中,我们将在MongoDB中创建一个包含单个集合的示例数据库,并为该集合添加一些文档。 然后,在本教程的其余部分中,我们将使用Transporter管道迁移和转换此示例数据。

首先,连接到你的MongoDB数据库。

mongo

这会将您的提示改为mongo> ,表示您正在使用MongoDB shell。

从这里,选择一个数据库来处理。 我们将调用我们的my_application

use my_application

MongoDB ,您不需要显式创建数据库或集合。 一旦开始将数据添加到按名称选择的数据库中,该数据库将自动创建。

因此,要创建my_application数据库,请将两个文档保存到其users集合中:其中一个代表Sammy Shark,一个代表Gilly Glowfish。 这将是我们的测试数据。

db.users.save({"firstName": "Sammy", "lastName": "Shark"});
db.users.save({"firstName": "Gilly", "lastName": "Glowfish"});

添加完文档后,您可以查询users集合以查看您的记录。

db.users.find().pretty();

输出看起来类似于下面的输出,但_id列将会不同。 MongoDB自动添加对象ID来唯一标识集合中的文档。

{
  "_id" : ObjectId("59299ac7f80b31254a916456"),
  "firstName" : "Sammy",
  "lastName" : "Shark"
}
{
  "_id" : ObjectId("59299ac7f80b31254a916457"),
  "firstName" : "Gilly",
  "lastName" : "Glowfish"
}

CTRL+C退出MongoDB shell。

接下来,我们创建一个Transporter管道将这些数据从MongoDB移动到Elasticsearch。

第3步 - 创建基本管道

Transporter中的pipeline.js默认由一个名为pipeline.js的JavaScript文件定义。 给定源和接收器,内置init命令会在正确的目录中创建基本配置文件

使用MongoDB作为源和Elasticsearch作为接收器初始化一个入门级pipeline.js

transporter init mongodb elasticsearch

您将看到以下输出:

Writing pipeline.js...

你不需要为这一步修改pipeline.js ,但让我们看看它是如何工作的。

该文件看起来像这样,但您也可以使用命令cat pipeline.jsless pipeline.js (按q退出)或使用您最喜欢的文本编辑器打开文件来查看文件的内容。

pipeline.js
var source = mongodb({
  "uri": "${MONGODB_URI}"
  // "timeout": "30s",
  // "tail": false,
  // "ssl": false,
  // "cacerts": ["/path/to/cert.pem"],
  // "wc": 1,
  // "fsync": false,
  // "bulk": false,
  // "collection_filters": "{}",
  // "read_preference": "Primary"
})

var sink = elasticsearch({
  "uri": "${ELASTICSEARCH_URI}"
  // "timeout": "10s", // defaults to 30s
  // "aws_access_key": "ABCDEF", // used for signing requests to AWS Elasticsearch service
  // "aws_access_secret": "ABCDEF" // used for signing requests to AWS Elasticsearch service
  // "parent_id": "elastic_parent" // defaults to "elastic_parent" parent identifier for Elasticsearch
})

t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")

var sourcevar sink开头的行分别为MongoDB和Elasticsearch适配器定义JavaScript变量 我们将定义这些适配器稍后在此步骤中需要的MONGODB_URIELASTICSEARCH_URI环境变量。

//开头的行是注释。 它们突出显示了一些可以为流水线设置的常见配置选项,但我们并未将它们用于我们在此创建的基本流水线。

最后一行连接源和接收器。 可变transportert让我们访问我们的管道。 我们使用.Source().Save() 函数使用之前在文件中定义的source变量和sink变量来添加源和汇。

Source()Save()函数的第三个参数是namespace. /.*/作为最后一个参数传递意味着我们要从MongoDB传输所有数据并将其保存在Elasticsearch中的相同名称空间下。

在我们运行这个管道之前,我们需要为MongoDB URIElasticsearch URI设置环境变量 在我们使用的示例中,两者都使用默认设置本地托管,但如果您使用现有的MongoDB或Elasticsearch实例,请确保您自定义这些选项。

export MONGODB_URI='mongodb://localhost/my_application'
export ELASTICSEARCH_URI='http://localhost:9200/my_application'

现在我们准备好运行管道了。

transporter run pipeline.js

你会看到像这样结束的输出:

. . .
INFO[0001] metrics source records: 2                     path=source ts=1522942118483391242
INFO[0001] metrics source/sink records: 2                path="source/sink" ts=1522942118483395960
INFO[0001] exit map[source:mongodb sink:elasticsearch]   ts=1522942118483396878

在第二行和倒数第三行中,此输出表明源中有2条记录,而2条记录已移至接收器。

要确认这两个记录都已处理完毕,可以查询Elasticsearch以查找my_application数据库的内容,该数据库现在应该存在。

curl $ELASTICSEARCH_URI/_search?pretty=true

?pretty=true参数使输出更易于阅读:

{
  "took" : 5,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "my_application",
        "_type" : "users",
        "_id" : "5ac63e9c6687d9f638ced4fe",
        "_score" : 1.0,
        "_source" : {
          "firstName" : "Gilly",
          "lastName" : "Glowfish"
        }
      },
      {
        "_index" : "my_application",
        "_type" : "users",
        "_id" : "5ac63e986687d9f638ced4fd",
        "_score" : 1.0,
        "_source" : {
          "firstName" : "Sammy",
          "lastName" : "Shark"
        }
      }
    ]
  }
}

MongoDB中的数据库和集合类似于Elasticsearch中的索引和类型。 考虑到这一点,你应该看到:

  • _index字段设置为my_application,即原始MongoDB数据库的名称)。
  • users,设置的_type字段是MongoDB集合的名称。
  • firstNamelastName字段分别填写“Sammy”,“Shark”和“Gilly”,“Glowfish”。

这证实来自MongoDB的记录都通过Transporter成功处理并加载到Elasticsearch。 为了建立在这个基本流水线上,我们将添加一个可以转换输入数据的中间处理步骤。

第4步 - 创建一个变压器

顾名思义, 变换器在将源数据加载到接收器之前修改源数据。 例如,它们允许您添加新字段,删除字段或更改字段的数据。 运输车带有一些预定义的变压器以及对定制变压器的支持。

通常,自定义转换器被编写为JavaScript函数并保存在单独的文件中。 要使用它们,可以在pipeline.js添加对变压器文件的引用。 Transporter包括Otto和Goja JavaScript引擎。 因为Goja更新更快,我们将在这里使用它。 唯一的功能区别是语法。

创建一个名为transform.js的文件,我们将使用它来编写我们的转换函数。

nano transform.js

以下是我们将使用的函数,它将创建一个名为fullName的新字段,其值将由一个空格(如Sammy Shark )分隔的firstNamelastName字段连接在一起。

transform.js
function transform(msg) {
    msg.data.fullName = msg.data.firstName + " " + msg.data.lastName;
    return msg
}

我们来看看这个文件的内容:

  • function transform(msg),的第一行是函数定义
  • msg是一个包含源文档详细信息的JavaScript对象 我们使用这个对象来访问通过管道的数据
  • 该函数的第一行连接两个现有字段并将该值分配给新的fullName字段。
  • 该函数的最后一行返回要使用的其余管道的新修改的msg对象。

保存并关闭文件。

接下来,我们需要修改管道来使用这个变压器。 打开pipeline.js文件进行编辑。

nano pipeline.js

在最后一行中,我们需要添加对Transform()函数的调用,以将Transform()添加到调用Source()Save()之间的管道中,如下所示:

〜/转运/ pipeline.js
. . .
t.Source("source", source, "/.*/")
.Transform(goja({"filename": "transform.js"}))
.Save("sink", sink, "/.*/")

传递给Transform()的参数是Transform()类型,在这种情况下是Goja。 使用goja函数,我们使用相对路径指定变换器的文件名。

保存并关闭文件。 在重新运行流水线来测试变压器之前,让我们先从前面的测试中清除Elasticsearch中的现有数据。

curl -XDELETE $ELASTICSEARCH_URI

你会看到这个输出确认命令的成功。

{"acknowledged":true}

现在重新运行管道。

transporter run pipeline.js

输出看起来与之前的测试非常相似,并且您可以在最后几行看到管道是否像以前一样成功完成。 可以肯定的是,我们可以再次检查Elasticsearch以查看数据是否以我们预期的格式存在。

curl $ELASTICSEARCH_URI/_search?pretty=true

您可以在新输出中看到fullName字段:

{
  "took" : 9,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "my_application",
        "_type" : "users",
        "_id" : "5ac63e9c6687d9f638ced4fe",
        "_score" : 1.0,
        "_source" : {
          "firstName" : "Gilly",
          "fullName" : "Gilly Glowfish",
          "lastName" : "Glowfish"
        }
      },
      {
        "_index" : "my_application",
        "_type" : "users",
        "_id" : "5ac63e986687d9f638ced4fd",
        "_score" : 1.0,
        "_source" : {
          "firstName" : "Sammy",
          "fullName" : "Sammy Shark",
          "lastName" : "Shark"
        }
      }
    ]
  }
}

请注意fullName字段已经在两个文档中添加了正确设置的值。 有了这个,现在我们知道如何将自定义转换添加到传输器管道。

结论

您已经构建了一个带有转换器的基本Transporter管道,用于将数据从MongoDB复制并修改为Elasticsearch。 您可以以相同的方式应用更复杂的转换,在同一管道中链接多个转换,等等。 MongoDB和Elasticsearch只是Transporter支持的两个适配器。 它还支持平面文件,SQL数据库(如Postgres)以及许多其他数据源。

您可以查看GitHub上Transporter项目,以便随时了解API中的最新变化,并访问Transporter维基以获取有关如何使用适配器,变压器和Transformer其他功能的更多详细信息。