2019-01-29 22:23:41 +00:00
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
2024-01-16 10:41:11 -05:00
2019-01-29 22:23:41 +00:00
// snippet-start:[kinesisanalytics.java.example.streams-sink]
package com.amazonaws.services.kinesisanalytics ;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime ;
import org.apache.flink.api.common.serialization.SimpleStringSchema ;
import org.apache.flink.streaming.api.datastream.DataStream ;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment ;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer ;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer ;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants ;
import java.io.IOException ;
import java.util.Map ;
import java.util.Properties ;
public class StreamingJob {
private static final String region = " us-east-1 " ;
private static final String inputStreamName = " ExampleInputStream " ;
private static final String outputStreamName = " ExampleOutputStream " ;
private static DataStream < String > createSourceFromStaticConfig ( StreamExecutionEnvironment env ) {
Properties inputProperties = new Properties ( ) ;
inputProperties . setProperty ( ConsumerConfigConstants . AWS_REGION , region ) ;
inputProperties . setProperty ( ConsumerConfigConstants . STREAM_INITIAL_POSITION , " LATEST " ) ;
return env . addSource ( new FlinkKinesisConsumer < > ( inputStreamName , new SimpleStringSchema ( ) , inputProperties ) ) ;
}
2024-01-16 10:41:11 -05:00
2019-01-29 22:23:41 +00:00
private static DataStream < String > createSourceFromApplicationProperties ( StreamExecutionEnvironment env )
throws IOException {
Map < String , Properties > applicationProperties = KinesisAnalyticsRuntime . getApplicationProperties ( ) ;
return env . addSource ( new FlinkKinesisConsumer < > ( inputStreamName , new SimpleStringSchema ( ) ,
applicationProperties . get ( " ConsumerConfigProperties " ) ) ) ;
}
private static FlinkKinesisProducer < String > createSinkFromStaticConfig ( ) {
Properties outputProperties = new Properties ( ) ;
outputProperties . setProperty ( ConsumerConfigConstants . AWS_REGION , region ) ;
outputProperties . setProperty ( " AggregationEnabled " , " false " ) ;
FlinkKinesisProducer < String > sink = new FlinkKinesisProducer < > ( new SimpleStringSchema ( ) , outputProperties ) ;
sink . setDefaultStream ( outputStreamName ) ;
sink . setDefaultPartition ( " 0 " ) ;
return sink ;
}
2024-01-16 10:41:11 -05:00
2019-01-29 22:23:41 +00:00
private static FlinkKinesisProducer < String > createSinkFromApplicationProperties ( ) throws IOException {
Map < String , Properties > applicationProperties = KinesisAnalyticsRuntime . getApplicationProperties ( ) ;
FlinkKinesisProducer < String > sink = new FlinkKinesisProducer < > ( new SimpleStringSchema ( ) ,
applicationProperties . get ( " ProducerConfigProperties " ) ) ;
sink . setDefaultStream ( outputStreamName ) ;
sink . setDefaultPartition ( " 0 " ) ;
return sink ;
}
public static void main ( String [ ] args ) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ( ) ;
/*
* if you would like to use runtime configuration properties, uncomment the
* lines below
* DataStream<String> input = createSourceFromApplicationProperties(env);
*/
DataStream < String > input = createSourceFromStaticConfig ( env ) ;
/*
* if you would like to use runtime configuration properties, uncomment the
* lines below
* input.addSink(createSinkFromApplicationProperties())
*/
2024-01-16 10:41:11 -05:00
2019-01-29 22:23:41 +00:00
input . addSink ( createSinkFromStaticConfig ( ) ) ;
env . execute ( " Flink Streaming Java API Skeleton " ) ;
}
}
// snippet-end:[kinesisanalytics.java.example.streams-sink]