本文共 4313 字,大约阅读时间需要 14 分钟。
使用kafka在eclipe上可以设计生产着 和消费者
例子一将本地文件上传到kafka上然后通过设计kafka的消费者取回到本地上传到kafka上需要
KafkaProducerproducer;Properties;//kafka的链接需要初始化数据这里需要properties将所需的东西以字符串的形式写在properties文件中所需东西不多且不会修改的情况下可以直接写在类里面.FileInputStream 以字节流的方式传入到kafkapackage com.ocean.kafka;import java.io.BufferedReader;import java.io.File;import java.io.FileInputStream;import java.io.FileNotFoundException;import java.io.FileReader;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;import java.io.Reader;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;public class HomeWork { private KafkaProducerproducer; private Properties properties; // 初始化数据 public HomeWork() { properties = new Properties(); properties.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); producer = new KafkaProducer (properties); } // 指定发送到的topic public void assignPartitionSend(String key, byte[] value) { ProducerRecord record = new ProducerRecord ("home-work_pic",0, key, value); producer.send(record); } // 准备数据 public void preparLocalData() throws IOException { File file = new File("C:\\Users\\Administrator\\Desktop\\psb.jpg"); FileInputStream fis =new FileInputStream(file); byte[] context = new byte[1024]; int a = 0; int length=0; while ((length=fis.read(context))!=-1) {//这里要注意得判断读取内容的实际长度 如果不这样设置 回到多出来//很多空格如果是图片的话则取回时无法还原 byte[] newbyte =new byte[length]; System.arraycopy(context, 0, newbyte, 0, length); assignPartitionSend("TIMES" + a, newbyte); a++; } fis.close(); } public void close() { producer.flush(); producer.close(); } public static void main(String[] args) throws IOException { HomeWork homeWork = new HomeWork(); try { homeWork.preparLocalData(); } catch (IOException e) { e.printStackTrace(); } homeWork.close(); }}
将数据传到Kafka上之后要想查看 可以通过zookeeper的客户端但是什么都看不懂因为是字节流(需要注意的一点是文件传到kafka上分区为0不然的话就会出现文件对应不上 )
最后想看的话就是取回来 下面的累就是将kafka上的文件数据传回到本地package com.ocean.kafka;import java.io.File;import java.io.FileOutputStream;import java.io.IOException;import java.util.ArrayList;import java.util.List;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;public class HomeWork2 { private Properties properties = new Properties(); private KafkaConsumerconsumer; public HomeWork2() { properties = new Properties(); properties.put("bootstrap.servers", "master:9092"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); properties.setProperty("group.id", "home-work_pic"); consumer = new KafkaConsumer (properties); } public void getfile() throws IOException { File file =new File("C:\\Users\\Administrator\\Desktop\\output.jpg"); FileOutputStream fileOutputStream =new FileOutputStream(file,true); List topics =new ArrayList (); topics.add("home-work_pic"); consumer.subscribe(topics); while(true){ ConsumerRecords records =consumer.poll(1000); for (ConsumerRecord record : records) { if(record.value()!=null){ System.out.println(record.value()); byte[] b =record.value(); fileOutputStream.write(b); fileOutputStream.flush(); } } } } public static void main(String[] args) { HomeWork2 homeWork2 =new HomeWork2(); try { homeWork2.getfile(); } catch (IOException e) { e.printStackTrace(); } }}
这就是一个简单的设置将文件以字节流的方式上传和下载从kafka上
转载地址:http://cenwl.baihongyu.com/