Nifi - Querying Multiple Databases using DBCPConnectionPoolLookup controller services


Nifi formerly called Niagarafiles, is simply a tool to automate the data flow between 2 or more systems. To simplify this defination i would quote an example:

If you have a 2 ssytem one source - lets say Oracle Database from where you have table which you want to replicate (CDC) to another destination say on Amazon S3. This can be achieved very easily in NiFi by batching the consumed data from an Oracle Database and ingesting it on Amazon S3 storage in a form a file ( which can be any format like Avro , CSV, Parquet etc ).

In this article i will mainly solve a problem which most of us usually face. 

Problem Statement:

        I have multiple databases ( lets say 100 different Databases ) where the requirement is to execute same query and retrive the data. Question is why would some one want to run a same query across so many different database's and the answer can be to lookup some metadata from each database and to capture user login information basically to capture Audit data.

Once the data is captured from each database , i also want to ingest the same in my Audit history table.

How to solve this problem with Nifi :

This problem is solvable using "DBCPConnectionPoolLookup" controller service which can take multiple database names and create a loop to execute the same query in each database.
It may look simple at first but kind of get complicated with multiple fields and different processor one has to use to achieve the results.

Following is the Flow i have used to perform the action, i will walk you through step by step :


In this example, i have used MySql to perform the demo as it was easy to create multiple database's in same instance, but you can use any other database like Oracle and capture database names from OEM repository.

Lets start by creating "DBCPConnectionPool" controller service for each database, In this example i have created 5 databases ( A,B,C,D and E) in Mysql and added the DBCPConnectionPool controller service for each.


Note: Later in the blog, i have created a updateAttribute processor where i am adding a new Attribute named "database.name" , this is a mandatory attribute from where "DBCPConnectionPoolLookup" controller service will pickup the Database Name value where you would run the audit query.

Conenction details for one of the DBCPConnectionPool is as follow:



Each of the above datababse has 1 table "T" with one Column "a":

Script as follows:

Create Databases:

Create database A;
Create database B;
Create database C;
Create database D;
Create database E;

Create Tables:
create table A.T(a int);
create table B.T(a int);
create table C.T(a int);
create table D.T(a int);
create table E.T(a int);

Insert unique records in each tables to ensure uniqueness.

insert into A.T values(1);
insert into B.T values(2);
insert into C.T values(3);
insert into D.T values(4);
insert into E.T values(5);
commit;


Create a Destination Table where you want to insert the audit data coming from different databases:

Create table A.C ( a int ) ;   // In this case i have reused Database A to create  destination Table C.

Start Creating the Flow:

Procesor 1 : ExecuteSQL

This processor select the list of database from metastore on which you would want to run the common Audit Query in loop:



Note: This SQL query returs Schema_name which is actually a database name created in MYSQL instance.

Processor 2: ConvertAvroToJson.

This processor is required to convert the Avro values returned from first processor to Json.

Processor 3 and 4:  SplitJson and EvluateJsonPath

These processor are used to first split the values returned from the Json , it becomes more important if there are multiple values returned and EvluateJsonPath is used to capture the schame_name from Json and store in database.name attribute.

Processor 5: UpdateAttribute

This processor is required to add a attribute named database.name from where DBCPConnectionPoolLookup Controller Service picksup the name of the database and select the respective DBCPConnectionPool Controller Service to execute the SQL in Loop based on the different DB Names returned by ExecuteSQL processor in step 1.

Processor 6: ExecuteSQL

This processor contains the Audit statement you would want to execute in each database to later store it in Audit History table.

Note: Connection Pool Service is referring to DBCPConnectionPoolLookup Controller Service. and Sql Select Query contains the Audit Query required to execute.

Rest is the Story, i.e. Processor 7 , 8 and 9 are Converting Avro to Json, Json to SQL and PutSQL which will be used to execute the insert statement for the data returned from Audit queries executed in different databases.

Please find the complete Template and download if from here:

Run_Queries_in_Multiple_Databases.xml

Hope this arcticle may come useful to you. Please leave your comments ..





Comments

  1. Hi Deepak,

    Nice explanation. I had a similar use case but the difference is that I am using PutDatabaseRecord processor and providing it the DBCPConnectionPoolLookupService. I have 2 databases and both have the same table A but one column name is different. A1 has LoadDate and A2 has FileLoadDate. The avro schema given also has different column names (i.e. A1's AVRO has LoadDate and A2's AVRO has FileLoadDate). When first flowfile triggers PutDBRecord processor, it inserts the records fine. But when the 2 flowfile with A2's database.name comes then it fails saying the LoadDate(which is in A1's AVRO) column is not defined or something. This might be related to PutDBRecord processor caching the attributes, I am not sure.

    Can you help me?

    ReplyDelete
  2. Have you tried route on attribute to redirect the flow ?

    ReplyDelete
  3. Run_Queries_in_Multiple_Databases.xml not exists

    ReplyDelete
    Replies
    1. It seems I have moved it to different location, I shall update it shortly

      Delete
    2. Please share the updated path of Flow XML

      Delete

Post a Comment

Popular Posts