In-depth analysis of source code analysis of Sqoop's migration process

Posted May 25, 20205 min read

** Abstract ** Sqoop is a tool for efficiently transferring batch data between Apache Hadoop and structured data storage(such as relational databases) This article will briefly introduce the classes and methods related to the execution of Sqoop jobs, and combine this process with the execution of MapReduce to analyze how the data migrates from the source to the destination.

Sqoop job execution process

Regardless of the execution process of MR, there are a total of 5 key classes used in Sqoop execution, Initializer, Partitioner, Extractor, Loader, Destroyer. The execution flow is shown in the figure below

  • Initializer:Initialization stage, source data verification, parameter initialization and other work;
  • Partitioner:the source data is divided into pieces, and the number of pieces of source data to be divided is determined according to the number of concurrent jobs
  • Extractor:start the extractor thread and construct the data write queue from memory according to user configuration;
  • Loader:start the loader thread, read data from the queue and throw;
  • Destroyer:resource recovery, disconnect sqoop from the data source, and release resources;

Therefore, each time a new connector is created, the above five classes must be implemented.

Initializer

Initializer is called before the sqoop task is submitted to MR, mainly to prepare for the migration, such as connecting data sources, creating temporary tables, adding dependent jar packages, etc. It is the first step of sqoop job life cycle, the main API is as follows

public abstract void initialize(InitializerContext context, LinkConfiguration linkConfiguration, JobConfiguration jobConfiguration);

public List getJars(InitializerContext context, LinkConfiguration linkConfiguration, JobConfiguration jobConfiguration) {

  return new LinkedList <String>();

}

public abstract Schema getSchema(InitializerContext context, LinkConfiguration linkConfiguration, JobConfiguration jobConfiguration) {

    return new NullSchema();

}

The getSchema() method is used by the connector from the To or To to match data when extracting or loading data. For example, a GenericJdbcConnector will call it to obtain the database name, table name, and field information of the source Mysql.

Destroyer

Destroyer is instantiated after the execution of the job, which is the last step of the Sqoop job. Clean up tasks, delete temporary tables, close connectors, etc.

public abstract void destroy(DestroyerContext context,

                                               LinkConfiguration linkConfiguration, JobConfiguration jobConfiguration);

Partitioner

Partitioner creates partition Partition, Sqoop creates 10 partitions by default, the main API is as follows

public abstract List getPartitions(PartitionerContext context,

 LinkConfiguration linkConfiguration, FromJobConfiguration jobConfiguration);

Partition class implements readFields() method and write() method to facilitate reading and writing

public abstract class Partition {
public abstract void readFields(DataInput in) throws IOException;
public abstract void write(DataOutput out) throws IOException;
public abstract String toString();
}

Extractor

The Extractor class extracts data from the source according to the partition partition and configuration information, and writes it into SqoopMapDataWriter. SqoopMapDataWriter is an internal class of SqoopMapper, which inherits the DataWriter class. In addition, it packs the SqoopWritable class to save the data read from the source in an intermediate data format.

public abstract void extract(ExtractorContext context,

                          LinkConfiguration linkConfiguration,
                          JobConfiguration jobConfiguration,
                          SqoopPartition partition);

The core code of this method is as follows

while(resultSet.next()) {
...
context.getDataWriter(). writeArrayRecord(array);
...
}

Loader

The loader accepts data from the source and loads it into the destination. It must implement the following interface

public abstract void load(LoaderContext context,

                       ConnectionConfiguration connectionConfiguration,
                       JobConfiguration jobConfiguration) throws Exception;

The load method reads from SqoopOutputFormatDataReader, which reads the data in the "intermediate data format representation" \ _ and loads it to the data source. In addition, Loader must iteratively call DataReader() until it finishes reading.

while((array = context.getDataReader(). readArrayRecord())! = null) {
...
}

MapReduce execution process

The previous section avoided the MR execution process and only described the migration process from the Extractor and Loader processes. In the following, a Sqoop migration operation process will be introduced in detail in conjunction with the execution process of MR.

initialization

1) During the job initialization phase, SqoopInputFormat reads the process of sharding the source data

  • The getSplits method of SqoopInputFormat will call the getPartitions method of the Partitioner class
  • Wrap the returned Partition list into SqoopSplit;
  • The default number of fragments is 10

Here each Partition will be handed over to a Mapper for execution. Each Mapper starts an extractor thread and Loader thread to migrate data.

Mapper

2) Mapper process during job execution

  • SqoopMapper contains a SqoopMapDataWriter class,
  • Mapper's run() calls the Extractor.extract method, which iteratively obtains source data and then calls DataWriter to write to the Context

private Class SqoopMapDataWriter extends DataWriter {

    ...
    private void writeContent() {
        ...
        context.wirte(writable, NullWritable.get()); //writable here is an object of SqoopWritable
        ...
    }
    ...

}

Note:The KV pair stored in the Context here, K is SqoopWritable, and V is just an empty Writable object. SqoopWritable implements write and readField for serialization and deserialization.

Reducer

3) Reduce process of job execution phase,

  • SqoopOutputFormatLoadExecutor wraps three internal classes SqoopOuputFormatDataReader, SqoopRecordWriter, ConsumerThread;
  • SqoopNullOutputFormat creates a thread when calling getRecordWriter:ConsumerThread, the code is as follows

public RecordWriter <SqoopWritable, NullWritable> getRecordWriter() {

executorService = Executors.newSingleThreadExecutor(...);
 consumerFuture = executorService.submit(new ConsumerThread(context));
 return writer;

}

  • ConsumerThread integrates the Runnable interface, and the Loader.load(...) method is called internally in the thread. This method uses the DataReader to iteratively read SqoopWritable from the Context, write it into an intermediate data format, and then write it to the destination database. .

private class ConsumerThread implements Runnable {

...
public void run() {
    ...
    Loader.load(loaderContext, connectorLinkConfig, ConnectorToJobConfig);
    ...
}
...

}

note:

  • In local mode, when Sqoop submits a task without setting SqoopReducer.class, MR will call a default reducer.class.
  • setContent is SqoopRecordWriter.write(...), which deserializes SqoopWritable and stores it in an intermediate storage format, namely IntermediateDataFormat. Correspondingly, getContent is to read data from the intermediate storage format.
  • Sqoop defines a pluggable intermediate data format abstract class, IntermediateDataFormat class, SqoopWritable package this abstract class to save intermediate data.

The above is the related classes and methods of Sqoop job execution, I hope to help you in the process of data migration.

Click to follow and learn the latest Huawei cloud technology ~

Related Posts