bootstrap-server
required by production and consumption.test
. This topic is used as an example below to describe how to consume messages.pom.xml
as follows:<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>Test-CKafka</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.2.2</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.25</version><scope>compile</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.6.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.6.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.7.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.3</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build></project>
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class CKafkaConsumerDemo {public static void main(String args[]) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();//Domain name address for public network access, i.e., public routing address, which can be obtained in the access mode module of the instance details page.properties.setProperty("bootstrap.servers", "IP:PORT");//Consumer group ID.properties.setProperty("group.id", "testConsumerGroup");DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("topicName", new SimpleStringSchema(), properties));stream.print();env.execute();}}
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class CKafkaConsumerDemo {public static void main(String args[]) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();//Domain name address for public network access, i.e., public routing address, which can be obtained in the access mode module of the instance details page.properties.setProperty("bootstrap.servers", "IP:PORT");//Consumer group ID.properties.setProperty("group.id", "testConsumerGroup");properties.setProperty("security.protocol", "SASL_PLAINTEXT");properties.setProperty("sasl.mechanism", "PLAIN");//Username and password. The username is not the one on the console, but needs to be concatenated as “instanceId#username” instead.properties.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required\\nusername=\\"yourinstanceId#yourusername\\"\\npassword=\\"yourpassword\\";");properties.setProperty("sasl.kerberos.service.name","kafka");DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("topicName", new SimpleStringSchema(), properties));stream.print();env.execute();}}
Was this page helpful?