1:采用BucketingSink的方式
public class BucketingSinkDemo {
public static void main(String[] args) throws Exception {
long rolloverInterval = 2 * 60 * 1000;
long batchSize = 1024 * 1024 * 100;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
System.setProperty("HADOOP_USER_NAME", "hadoop");
String topic = "ods_lark_order";
Properties prop = new Properties();
prop.setProperty("bootstrap.servers","ip:port");
prop.setProperty("group.id","groupid");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);
kafkaConsumer.setStartFromGroupOffsets();//默认消费策略
DataStreamSource<String> source = env.addSource(kafkaConsumer);
//
BucketingSink<String> hadoopSink = new BucketingSink<>("hdfs://ip:port/flink/order_sink");
// HDFS的配置
Configuration configuration = new Configuration();
// 1.能够指定block的副本数
configuration.set("dfs.replication","1");
hadoopSink.setFSConfig(configuration);
// 2.指定分区文件夹的命名
hadoopSink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd--HH", ZoneId.of("Asia/Shanghai")));
// 3.指定块大小和时间间隔生成新的文件
hadoopSink.setBatchSize(batchSize);
hadoopSink.setBatchRolloverInterval(rolloverInterval);
// 4.指定生成文件的前缀,后缀,正在运行文件前缀
hadoopSink.setPendingPrefix("order_sink");
hadoopSink.setPendingSuffix("");
hadoopSink.setInProgressPrefix(".in");
source.addSink(hadoopSink);
env.execute();
}
}
采用这种方式的好处:
1.能够指定block的副本数
2.指定分区文件夹的命名
3.指定块大小和时间间隔生成新的文件
4.指定生成文件的前缀,后缀,正在运行文件前缀
缺点:
该方法已经过期,新版建议采用StreamingFileSink,笔者第一次找到该类发现能够写入成功,但是没有找到如何能够对写入HDFS进行压缩,比如parquet或者orc
2:采用StreamingFileSink的方式-行编码【forRowFormat】
public class StreamingFileSinkForRowFormatDemo {
public static void main(String[] args) throws Exception {
//获取Flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
System.setProperty("HADOOP_USER_NAME", "hadoop");
String topic = "ods_lark_order";
Properties prop = new Properties();
prop.setProperty("bootstrap.servers","ip:port");
prop.setProperty("group.id","first");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);
myConsumer.setStartFromGroupOffsets();//默认消费策略
DataStreamSource<String> source = env.addSource(myConsumer);
// 自定义滚动策略
DefaultRollingPolicy<String, String> rollPolicy = DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(2))/*每隔多长时间生成一个文件*/
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))/*默认60秒,未写入数据处于不活跃状态超时会滚动新文件*/
.withMaxPartSize(128 * 1024 * 1024)/*设置每个文件的最大大小 ,默认是128M*/
.build();
// 输出文件的前、后缀配置
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".txt")
.build();
StreamingFileSink<String> streamingFileSink = StreamingFileSink
.forRowFormat(new Path("hdfs://192.168.1.204:9000/flink/data/"),new SimpleStringEncoder<String>("UTF-8") )
.withBucketAssigner(new DateTimeBucketAssigner<>())
// 设置指定的滚动策略
.withRollingPolicy(rollPolicy)
// 桶检查间隔,这里设置为1s
.withBucketCheckInterval(1)
// 指定输出文件的前、后缀
.withOutputFileConfig(config)
.build();
source.addSink(streamingFileSink);
env.execute("StreamingFileSinkTest");
}
}
采用这种方式的好处:
1.能够指定block的副本数
2.指定分区文件夹的命名
3.指定块大小和时间间隔生成新的文件
4.指定生成文件的前缀,后缀,正在运行文件前缀
缺点:
由于是按照行进行的,所以不能进行压缩
3:采用StreamingFileSink的方式-bucket压缩 【forBulkFormat】
public class StreamingFileSinkDemo {
public static void main(String[] args) throws Exception {
//获取Flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// checkpoint配置
env.enableCheckpointing(60000);
System.setProperty("HADOOP_USER_NAME", "hadoop");
String topic = "ods_lark_order";
Properties prop = new Properties();
prop.setProperty("bootstrap.servers","ip:port");
prop.setProperty("group.id","first");
// 获取流
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);
myConsumer.setStartFromGroupOffsets();
DataStreamSource<String> source = env.addSource(myConsumer);
DataStream<Order> nameDS = source.map(new MapFunction<String, Order>() {
@Override
public Order map(String s) throws Exception {
Order order = new Order();
JSONObject jsonObject = JSONObject.parseObject(s);
order.setName(jsonObject.getString("name"));
return order;
}
});
// 1.输出文件的前、后缀配置
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".txt")
.build();
// 设置为Parquet的压缩方式
StreamingFileSink<Order> streamingFileSink = StreamingFileSink
.forBulkFormat(new Path("hdfs://192.168.1.204:9000/flink/data/"), ParquetAvroWriters.forReflectRecord(Order.class))
/*这里是采用默认的分桶策略DateTimeBucketAssigner,它基于时间的分配器,每小时产生一个桶,格式如下yyyy-MM-dd--HH*/
.withBucketAssigner(new DateTimeBucketAssigner<>())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(config)
.build();
nameDS.addSink(streamingFileSink);
env.execute("StreamingFileSinkTest");
}
}
采用这种方式的好处:
1.输出文件的前、后缀配置
2.设置为Parquet的压缩方式
缺点:
文件生成是通过checkpoint时候触发的,当checkpoint 过于频繁的话会生成很多的小文件,同时任务数过多,也会生成很多小文件,涉及到后续的小文件合并的情况