1. Введение
Когда система имеет большое количество данных, которые необходимо импортировать из базы данных, использование Spring Parath может повысить эффективность импорта. Spring Paration использует ItemReader для чтения страниц данных, а Itempriter записывает данные партиями. Поскольку Spring Parath не предоставляет Elastisearch Itempriter и Itemreader, в этом примере пользовательский Elasticsearchitemwriter (ElasticSearchItemreader) используется для импорта партии.
2. Пример
2.1 pom.xml
В этой статье используется Spring Data Jest для подключения ES (вы также можете использовать Spring Data Elasticsearch для подключения ES), а версия ES - 5.5.3
<? xml version = "1.0" Encoding = "utf-8"?> <project xmlns = "http://maven.apache.org/pom/4.0.0" xmlns: xsi = "http://www.w3.org/2001/xmlschema-instance" xsi: schemalocation = "http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <dolidyversion> 4.0.0 </modelversion> <groupid> com.hfcsbc. <artifactid> es-etl </artifactid> <sersion> 0.0.1-snapshot </version> <badegaging> jar </packaging> <mame> es-etl </name> <socription> демо-проект для Spring Boot </description> <parent> <groupid> org.springframework.boot </GroupId> <sterfactid> spring-boot-startter-ppact-ppact-prame-pact-pact-pact-pramefartward. <serse> 2.0.0.m7 </version> <venterativePath/> <!-Lookup Parent из репозитория-> </parent> <properties> <project.build.sourceencoding> UTF-8 </Project.Build.SourceEncoding> <Project.Ropting.OutputEnting> utf-8 </project. <java.version> 1.8 </java.version> </properties> <depertiencies> <depertion> <groupid> org.springframework.boot </GroupId> <artifactid> spring-boot-starter </artifactid> </dependency> <dehyed> <groupid> org.spramework. <Artifactid> Spring-boot-starter-data-jpa </artifactid> </dependency> <depertiency> <groupid> org.postgresql </GroupId> <artifactid> postgresql </artifactid> </gropement> <dehyed> <groupicid> org.sprameform.boot </Group> <Artifactid> spring-boot-starter-batch </artifactid> </dependency> <depervice> <groupid> com.github.vanroy </groupid> <artifactid> spring-boot-starter-data-jest </artifactid> <sersive> 3.0.0.release </version> </зависимость> <зависимость> <Group> IIDSHORCHBORESKBORESHBORESHBORESERSBORESHARESBORESHARDESHARDESHARDSERID> IID. <TrifactId> jest </artifactid> <sersive> 5.3.2 </version> </dependency> <depertion> <groupid> org.projectlombok </groupid> <artifactid> lombok </artifactid> </degyed> <dehyed> <groupid> org.spramework.boot </GroupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> <repositories> <repository> <id> Spring-snapshots </id> <mame> Spring Snapshots </name> <url> https://repo.spring.io/snapshot </url> <plosithots> <Nabled> <url>https://repo.spring.io/milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repository> </repository> </repository> </pluginRepository> <pluginRepository> <id>spring-snapshots</id> <name>Spring Snapshots</name> <URL> https://repo.spring.io/snapshot </url> <sonposshots> <Nabled> true </enabled> </spanshots> </pluginRepository> <pluginRepository> <id> Spring-milestones </id> <mame> Spring Wilestones </name> <URL> https://repo.spring.io/milestone </url> <plyshots> <Nabled> false </enabled> </snapshots> </pluginRepository> </pluginRepository> </pluginRepository> </pluginrepository> </pluginRepositories> </project> </pluginRepository> </pluginRepositories>
2.2 Класс объектов и репозиторий
пакет com.hfcsbc.esetl.domain; import lombok.data; import org.springframework.data.elasticsearch.annotation.document; import org.springframework.data.elasticsearch.annotations.field; import.spramework.data.alsearch.annotations.field; org.springframework.data.elasticsearch.annotations.fieldtype; импорт javax.persistence.entity; import javax.persistence.id; импорт javax.persistence.onetoone;/** * Создать Pengchao на 2018/23/23 */@docum refreshinterval = "-1")@entity@datapublic class person {@id private long long; Приватное название строки; @Onetoone @field (type = fieldtype.nested) Частный адрес адреса;} пакет com.hfcsbc.esetl.domain; import lombok.data; import javax.persistence.entity; import javax.persistence.id;/** * Создание Pengchao на 2018/2/23 */@entity@DataPublic Class Адрес {@ID Private Long ID; Приватное строковое имя;}пакет com.hfcsbc.esetl.repository.jpa; import com.hfcsbc.esetl.domain.person; import org.springframework.data.jpa.repository.jparepository;/** * Create By PengChao на 2018/2/23 */public extrapository vestends jporepos overpostory jopepos o grongep
package com.hfcsbc.esetl.repository.es;import com.hfcsbc.esetl.domain.Person;import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;/** * Create by pengchao on 2018/2/23 */public interface EsPersonRepository extends ElasticsearchRepository<Person, Long> {}2.3 Настройка ElasticSearchItemWriter
пакет com.hfcsbc.esetl.itemwriter; import com.hfcsbc.esetl.repository.es.espersonrepository; import com.hfcsbc.esetl.domain.person; импорт org.springframework.batch.core.exitstatus; импорт org.spramework.bathritemer.corembrow.corembor.corembor. org.springframework.batch.core.stepexecution; импорт org.springframework.batch.core.stepexecutionlistener; import org.springframework.batch.item.itemwriter; import java.util.list;/** * Create by Pengchao на 2018/23/23 */Pupellastemwitmestemptisments Itempriter <derson>, itemWriteListener <Cerson>, StepExecutionListener {private EspersonRepository PersonRepository; public elasticsearchitemwriter (EspersonRepository PersonRepository) {this.personRepository = personRepository; } @Override public void beforeWrite(List<? extends Person> items) { } @Override public void afterWrite(List<? extends Person> items) { } @Override public void onWriteError(Exception exception, List<? extends Person> items) { } @Override public void beforeStep(StepExecution stepExecution) { } @Override public ExitStatus после шага (Stepexecution Stepexecution) {return null; } @Override public void write (List <? Extends Person> элементы) бросает Exception {// метод Saveall, который реализует класс AbstractElasticSearchRepository вызывает ElasticSearchoperations.bulkindex (запросы), который является пакетным индексом PersonRepository.saveall (элементы); }}2.4 Настройка ElasticSearchItemReader (этот пример не используется, только для справки)
пакет com.hfcsbc.esetl.itemreader; import org.springframework.batch.item.data.abstractPaginationDataiteMreader; импорт org.springframework.beans.factory.initializebean; import org.spramework.data.lasticsearch.coresearchaporations; org.springframework.data.elasticsearch.core.query.searchquery; import java.util.iterator;/** * Создать Pengchao на 2018/2/24 */public class elasticsearchitemreader <Personals> расширяет AbstractPaginatedDataiteMreader <Person> Implomations elasticabization ElasticsPoporations; Частный финальный запрос SearchQuery; Частный финальный класс <? расширяет человек> targettype; public elasticsearchitemreader (elasticsearchoperations elasticsearchoperations, searchquery Query, Class <? Extends Person> TargetType) {this.elasticSearchoperations = ElasticSearchOperations; this.query = Query; this.TargetType = targetType; } @Override защищен итератор <person> dopageread () {return (iterator <person>) elasticsearchoperations.queryforlist (Query, targetType) .iterator (); } @Override public void afterpropertiesset () бросает исключение {}}2.5 Конфигурация, необходимая для настройки пружинной партии
пакет com.hfcsbc.esetl.config; import com.hfcsbc.esetl.itemwriter.elasticsearchitemwriter; import com.hfcsbc.esetl.repository.es.espersonRepository; import com.hfcsbc.esetl.domain.person; importRamework.batch. org.springframework.batch.core.step; импорт org.springframework.batch.core.configuration.annotation.enablebatchprocessing; импорт org.springframework.batch.core.configuration.annotation.jobuilderfactory; import org.springframework.batch.core.configuration.annotation.stepbuilderFactory; импорт org.springframework.batch.core.launch.support.runidincrementer; импорт org.springframework.batch.core.repository.jobrepository; import org.springframework.batch.core.repository.support.jobrepositoryfactorybean; import org.springframework.batch.item.itemreader; import org.springframework.batch.item.itemwriter; imporm.spramework.batch.item.dapase.jpagearderite.jpage org.springframework.batch.item.database.orm.jpanativequeryprovider; import org.springframework.beans.factory.annotation.autowired; import org.springframework.context.nantation.bean; импорт org.spramework.annotation.contectoration; org.springframework.transaction.PlatformTransactionManager;import javax.persistence.EntityManagerFactory;import javax.sql.DataSource;/** * Create by pengchao on 2018/2/23 */@Configuration@EnableBatchProcessingpublic class BatchConfig { @Autowired private EsPersonRepository personRepository; @Bean public itemreader <derss> orderitemreader (EntityManagerFactory EntityManagerFactory) {jPapagingItemReader <Cervision> reader = new jPapagingItemReader <derns> (); String sqlquery = "select * from Person"; try {jpanativequeryprovider <person> QueryProvider = new jPanativeQueryProvider <derss> (); QueryProvider.setsqlQuery (sqlquery); QueryProvider.SetentityClass (Person.class); QueryProvider.afterPropertiesset (); reader.setentityManagerFactory (EntityManagerFactory); reader.setPagesize (10000); reader.setQueryProvider (QueryProvider); reader.afterpropertiesset (); reader.setsavestate (true); } catch (Exception e) {e.printstackTrace (); } return Reader; } @Bean public elasticsearchitemwriter itempriter () {return new elasticsearchitemwriter (personRepository); } @Bean Public Step Шаг (StepBuilderFactory StepBuilderFactory, itemReader ItemReader, Itempriter itempriter) {return stepbuilderfactory .get ("step1") .Chunk (10000) .Reader (itemReader) .writer (itempriter) .build (); } @Bean public Job (jobbuilderfactory jobbuilderfactory, step) {return jobbuilderfactory .get ("importJob") .incrementer (new Runidincrementer ()) .flow (step) .end () .build (); } /*** Когда будет выполнена пружинная партия, будут созданы некоторые таблицы, которые ей нужны. Вот место для создания таблицы: DataSource * @param DataSource * @param Manager * @return */ @bean public jobRepository jobRepository (DataSource DataSource, PlatformTransactionManager Manager) {JobRepositoryBectorybean jobRepositoryBactorybean = new JobRepoSitoryFactorybean (); JobRepositoryFactoryBean.SetDataSource (DataSource); JobRepositoryFactoryBean.SetTransActionManager (Manager); JobRepositoryFactoryBean.SetDataBaseType ("postgres"); try {return jobRepositoryFactoryBean.getObject (); } catch (Exception e) {e.printstackTrace (); } return null; }}2.6 Настройте адрес подключения базы данных и ES
Весна: Redis: Host: 192.168.1.222 Данные: Jest: URI: http://192.168.1.222:9200 Имя пользователя: Эластичный пароль: Changeme JPA: Database: Postgresql Show-Sql: True Hibernate: DDL-Auto: Обновление: платформа: PostGRE: POSTGRE: POSTGRE: POSTGRE: POSTGRE: POSTGRE: POSTGRE: POSTGRE: POSTGRE: POSTGRS: jdbc: postgresql: //192.168.1.222: 5433/person username: hfcb Пароль: HFCB-класс-класс-имени: org.postgresql.driver max-active: 2spring.batch.initialize-schema: Всегда
2.7 Настройте класс входа
package com.hfcsbc.esetl;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchAutoConfiguration;import org.springframework.boot.autoconfigure.data.elasticsearch.elasticsearchdataautoconfiguration; импорт org.springframework.data.elasticsearch.repository.config.enableLasticsearchRepositories; import org.springframework.data.jpa.repository.config.enablejparePositories; @springbootapplication (exclide = {elasticsearchautoconfiguration.class, elasticsearchdatautoconfiguration.class})@enable -earchrepositories (basepackages = class})@enablec "com.hfcsbc.esetl.repository")@enablejparepositories (basepackages = "com.hfcsbc.esetl.repository.jpa") public class esetlapplication {public static void main (string [] args) {springapplication.run (eSetlApplication.class, args); }}Выше всего содержание этой статьи. Я надеюсь, что это будет полезно для каждого обучения, и я надеюсь, что все будут поддерживать Wulin.com больше.