<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>Test-CKafka</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.2.2</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.25</version><scope>compile</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.6.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.6.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.7.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.3</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build></project>
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class CKafkaConsumerDemo {public static void main(String args[]) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();//公网接入域名地址,即公网路由地址,在实例详情页的接入方式模块获取。properties.setProperty("bootstrap.servers", "IP:PORT");//消费者组id。properties.setProperty("group.id", "testConsumerGroup");DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("topicName", new SimpleStringSchema(), properties));stream.print();env.execute();}}
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class CKafkaConsumerDemo {public static void main(String args[]) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();//公网接入域名地址,即公网路由地址,在实例详情页的接入方式模块获取。properties.setProperty("bootstrap.servers", "IP:PORT");//消费者组id。properties.setProperty("group.id", "testConsumerGroup");properties.setProperty("security.protocol", "SASL_PLAINTEXT");properties.setProperty("sasl.mechanism", "PLAIN");//用户名和密码,注:用户名是需要拼接,并非控制台的用户名:instanceId#username。properties.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required\\nusername=\\"yourinstanceId#yourusername\\"\\npassword=\\"yourpassword\\";");properties.setProperty("sasl.kerberos.service.name","kafka");DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("topicName", new SimpleStringSchema(), properties));stream.print();env.execute();}}
本页内容是否解决了您的问题?