博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka_java
阅读量:6805 次
发布时间:2019-06-26

本文共 4313 字,大约阅读时间需要 14 分钟。

使用kafka在eclipe上可以设计生产着 和消费者

例子一
将本地文件上传到kafka上然后通过设计kafka的消费者取回到本地


上传到kafka上需要

KafkaProducerproducer;
Properties;//kafka的链接需要初始化数据这里需要properties将所需的东西以字符串的形式写在properties文件中所需东西不多且不会修改的情况下可以直接写在类里面.
FileInputStream 以字节流的方式传入到kafka

package com.ocean.kafka;import java.io.BufferedReader;import java.io.File;import java.io.FileInputStream;import java.io.FileNotFoundException;import java.io.FileReader;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;import java.io.Reader;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;public class HomeWork {    private KafkaProducer
producer; private Properties properties; // 初始化数据 public HomeWork() { properties = new Properties(); properties.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); producer = new KafkaProducer
(properties); } // 指定发送到的topic public void assignPartitionSend(String key, byte[] value) { ProducerRecord
record = new ProducerRecord
("home-work_pic",0, key, value); producer.send(record); } // 准备数据 public void preparLocalData() throws IOException { File file = new File("C:\\Users\\Administrator\\Desktop\\psb.jpg"); FileInputStream fis =new FileInputStream(file); byte[] context = new byte[1024]; int a = 0; int length=0; while ((length=fis.read(context))!=-1) {//这里要注意得判断读取内容的实际长度 如果不这样设置 回到多出来//很多空格如果是图片的话则取回时无法还原 byte[] newbyte =new byte[length]; System.arraycopy(context, 0, newbyte, 0, length); assignPartitionSend("TIMES" + a, newbyte); a++; } fis.close(); } public void close() { producer.flush(); producer.close(); } public static void main(String[] args) throws IOException { HomeWork homeWork = new HomeWork(); try { homeWork.preparLocalData(); } catch (IOException e) { e.printStackTrace(); } homeWork.close(); }}

将数据传到Kafka上之后要想查看 可以通过zookeeper的客户端但是什么都看不懂因为是字节流(需要注意的一点是文件传到kafka上分区为0不然的话就会出现文件对应不上 )

最后想看的话就是取回来 下面的累就是将kafka上的文件数据传回到本地

package com.ocean.kafka;import java.io.File;import java.io.FileOutputStream;import java.io.IOException;import java.util.ArrayList;import java.util.List;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;public class HomeWork2 {    private Properties properties = new Properties();    private KafkaConsumer
consumer; public HomeWork2() { properties = new Properties(); properties.put("bootstrap.servers", "master:9092"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); properties.setProperty("group.id", "home-work_pic"); consumer = new KafkaConsumer
(properties); } public void getfile() throws IOException { File file =new File("C:\\Users\\Administrator\\Desktop\\output.jpg"); FileOutputStream fileOutputStream =new FileOutputStream(file,true); List
topics =new ArrayList
(); topics.add("home-work_pic"); consumer.subscribe(topics); while(true){ ConsumerRecords
records =consumer.poll(1000); for (ConsumerRecord
record : records) { if(record.value()!=null){ System.out.println(record.value()); byte[] b =record.value(); fileOutputStream.write(b); fileOutputStream.flush(); } } } } public static void main(String[] args) { HomeWork2 homeWork2 =new HomeWork2(); try { homeWork2.getfile(); } catch (IOException e) { e.printStackTrace(); } }}

这就是一个简单的设置将文件以字节流的方式上传和下载从kafka上

转载地址:http://cenwl.baihongyu.com/

你可能感兴趣的文章
win10远程桌面连接错误
查看>>
RH134 UNIT5
查看>>
解析linux根文件系统的目录树
查看>>
onTouchEvent事件中调用onFling方法
查看>>
我的友情链接
查看>>
linux shell
查看>>
Windows Server入门系列35 了解NTFS安全权限
查看>>
序列判断
查看>>
【C语言】【笔试题】实现一个函数int my_atoi(char s[]),可以将一个字符串转换为对应的整数。...
查看>>
php常用自定义函数
查看>>
150809112 杨磊
查看>>
如何取消开机按ctrl+alt+delete组合键
查看>>
Linux操作系统的主要用途是什么
查看>>
jmeter之使用fidder抓包之后进行接口测试
查看>>
进程调度模拟程序
查看>>
Linux自建简易版DNS主从服务器
查看>>
linux 7.2 安装openstack 过程出现rabbitmq-server 错误解决方法
查看>>
PHP CI框架学习笔记-分页实现程序
查看>>
提升不止一点点,Dubbo 3.0 预览版详细解读,还愣着干啥啊?进来啊
查看>>
磁盘配额(quota)
查看>>