1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
| package com.itiaoling.app.ods;
import com.alibaba.fastjson.JSONObject; import com.itiaoling.bean.batch.GoodsEbayCompatibilityModel; import lombok.extern.slf4j.Slf4j; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.cfg.DorisSink; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import java.util.Properties;
public class EscMysql2DorisBatch {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(); conf.setString("execution.checkpointing.interval", "10000"); conf.setString("execution.checkpointing.mode", "EXACTLY_ONCE"); conf.setString("state.checkpoints.num-retained", "2"); conf.setString("state.checkpoints.dir", "hdfs:/ns/flink/checkpoints/esc_buyer1_batch"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(10);
EnvironmentSettings Settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, Settings); tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tEnv.executeSql( "CREATE TABLE test_multi_jdbc( `id` BIGINT COMMENT '', `dbctime` TIMESTAMP(3) COMMENT '', `dbutime` TIMESTAMP(3) COMMENT '', `extension` STRING COMMENT '') COMMENT 'null' WITH ( 'connector' = 'multi-jdbc', 'table-name' = 'test_table.*', 'schema-name' = 'test_database.*', 'username' = '${username}', 'password' = '${password}', 'scan.partition.column' = 'id', 'scan.batch.size' = '100000', 'url' = 'jdbc:mysql://${connection1};jdbc:mysql://${connection2}' )
Table table = tEnv.sqlQuery("select t.id," + "t.site," + "t.category_id," + "t.esc_category_id," + "t.model_id," + "t.version," + "t.update_type," + "DATE_FORMAT(t.create_time, 'yyyy-MM-dd HH:mm:ss') as create_time," + "DATE_FORMAT(t.update_time, 'yyyy-MM-dd HH:mm:ss') as update_time," + "t.deleted," + "0 as is_deleted_flg, " + "1673853468700 as ts_ms " + "from goods_ebay_compatibility_model t " );
DataStream<GoodsEbayCompatibilityModel> ordersDataStream = tEnv.toDataStream(table, GoodsEbayCompatibilityModel.class); SingleOutputStreamOperator<Object> map = ordersDataStream.map(new MapFunction<GoodsEbayCompatibilityModel, Object>() { @Override public Object map(GoodsEbayCompatibilityModel goodsEbayCompatibilityModel) throws Exception { return JSONObject.toJSON(goodsEbayCompatibilityModel).toString(); } });
// map.print();
Properties sinkPproperties = new Properties(); sinkPproperties.setProperty("format", "json"); sinkPproperties.setProperty("strip_outer_array", "true");
map.addSink( DorisSink.sink( DorisReadOptions.builder().build(), DorisExecutionOptions.builder() .setBatchSize(100000) .setBatchIntervalMs(20000l) .setMaxRetries(3) .setStreamLoadProp(sinkPproperties).build(), DorisOptions.builder() .setFenodes("XXX:8030,XXX:8030,XXX:8030") .setTableIdentifier("ods.ods_esc_buyer_goods_ebay_compatibility_model_bak") .setUsername("XXX") .setPassword("XXX") .build() ));
env.execute("mysql-sink-doris-batch"); } }
|