kafka操作

http://blog.sina.com.cn/s/blog_628cc2b70102wdsb.html  异常

https://jingyan.baidu.com/article/eb9f7b6d367679869364e8d4.html  kafka管理

while read line
do
echo $line
kafka-configs.sh –zookeeper localhost:2181 –alter –entity-name $line –entity-type topics –add-config retention.ms=1000
kafka-configs.sh –zookeeper localhost:2181 –alter –entity-name $line –entity-type topics -add-config cleanup.policy=delete
kafka-topics.sh –delete –zookeeper localhost:2181 –topic $line
done < /opt/tmp.topics
kafka-run-class.sh kafka.tools.GetOffsetShell –broker-list localhost:9092 –topic qycloud-18.qycloud.nanjinganyuankeji_workflow     查看数量

kafka-topics.sh –list –zookeeper localhost:2181    查看topics

kafka-topics.sh –describe –zookeeper localhost:2181 –topic $line        查看topic详情

kafka-run-class.sh kafka.tools.ConsumerOffsetChecker  –group $group    查看偏移量

删除kafka的topics

参考:

http://www.bubuko.com/infodetail-2859810.html

https://blog.csdn.net/russle/article/details/82881297

https://blog.csdn.net/dianyueneo/article/details/37527087

https://blog.csdn.net/little_fxc/article/details/98494263

<code>

kafka-topics.sh –delete –zookeeper localhost:2181 –topic `kafka-topics.sh –list –zookeeper localhost:2181`
while read line
do
echo $line
kafka-configs.sh –zookeeper localhost:2181 –alter –entity-name $line –entity-type topics –add-config retention.ms=1000
kafka-config.sh –zookeeper localhost:2181 –alter –entity-name $line –entity-type topics -add-config cleanup.policy=delete
kafka-topics.sh –delete –zookeeper localhost:2181 –topic $line
done < /opt/tmp.topics2

</code>

kafka知识补充

http://www.kafka.cc/archives/252.html   message内容

https://blog.csdn.net/qq_30498935/article/details/82219365 动态修改topic的保留时长

kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=864000000

https://blog.csdn.net/whp15369657805/article/details/78729128  springboot修改kafka的配置

https://www.cnblogs.com/lihao7/p/9264000.html  修改kafka的offset

https://www.cnblogs.com/hongfeng2019/archive/2019/07/18/11210229.html 修改副本数

kafka清理策略

https://www.orchome.com/23    kafka压缩策略

https://blog.csdn.net/u013256816/article/details/80487758   kafka压缩过程

https://www.cnblogs.com/51python/p/10966757.html  kafka参数详解

https://blog.csdn.net/abc123lzf/article/details/100738169  kafka清理策略

http://kafka.apache.org/documentation/#topicconfigs  kafka官方配置

https://blog.csdn.net/Curry_lee_3/article/details/102382743   kafka压缩实战

 

JAVA六 flink kafka

https://www.cnblogs.com/yougewe/p/11371676.html  流处理实例

https://blog.csdn.net/learn_tech/article/details/81115996  使用pykafka处理kafka操作

pykafka生产消息

from pykafka import KafkaClient
#可接受多个client
client = KafkaClient(hosts =”127.0.0.1:9092″)
topic = client.topics[‘mastertest’.encode()]#选择一个topic

message = “2019-08-01 17:39:32,P0001,channel1,201908010116100001,100”
# 当有了topic之后呢,可以创建一个producer,来发消息,生产kafka数据,通过字符串形式,
with topic.get_sync_producer() as producer:
producer.produce(message.encode())

 

java flink处理消息

package com.aijava.flink.kafka;

import com.aijava.flink.kafka.KafkaConstantProperties;
import com.aijava.flink.kafka.KafkaTuple4StringSchema;
import com.aijava.flink.kafka.TestBizDataLineSplitter;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
* 用java 写消费者
*
*/
public class ConsumeKafkaByJava {

private static final String CONSUMER_GROUP_ID = “test.flink.consumer1”;

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.enableCheckpointing(1000);

Properties kafkaProps = new Properties();
kafkaProps.setProperty(“bootstrap.servers”, KafkaConstantProperties.KAFKA_BROKER);
kafkaProps.setProperty(“group.id”, CONSUMER_GROUP_ID);

FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>(
KafkaConstantProperties.FLINK_COMPUTE_TOPIC_IN1,
new SimpleStringSchema(),
kafkaProps);

DataStream<String> dataStream = env.addSource(myConsumer);
// 四元组数据为: 订单号,统计维度标识,订单数,订单金额
DataStream<Tuple4<String, String, Integer, Double>> counts = dataStream
.flatMap(new TestBizDataLineSplitter())
.keyBy(1)
.timeWindow(Time.of(30, TimeUnit.SECONDS))
.reduce((value1, value2) -> {
return new Tuple4<>(value1.f0, value1.f1, value1.f2 + value2.f2, value1.f3 + value2.f3);
});

// 暂时输入与输出相同
counts.addSink(new FlinkKafkaProducer010<>(
KafkaConstantProperties.FLINK_DATA_SINK_TOPIC_OUT1,
new KafkaTuple4StringSchema(),
kafkaProps)
);
// 统计值多向输出
dataStream.print();
counts.print();
env.execute(“Test Count from Kafka data”);
}

}