十年網(wǎng)站開(kāi)發(fā)經(jīng)驗(yàn) + 多家企業(yè)客戶(hù) + 靠譜的建站團(tuán)隊(duì)
量身定制 + 運(yùn)營(yíng)維護(hù)+專(zhuān)業(yè)推廣+無(wú)憂(yōu)售后,網(wǎng)站問(wèn)題一站解決
storm-kafka-client使用的示例分析,相信很多沒(méi)有經(jīng)驗(yàn)的人對(duì)此束手無(wú)策,為此本文總結(jié)了問(wèn)題出現(xiàn)的原因和解決方法,通過(guò)這篇文章希望你能解決這個(gè)問(wèn)題。

巴林左旗ssl適用于網(wǎng)站、小程序/APP、API接口等需要進(jìn)行數(shù)據(jù)傳輸應(yīng)用場(chǎng)景,ssl證書(shū)未來(lái)市場(chǎng)廣闊!成為成都創(chuàng)新互聯(lián)的ssl證書(shū)銷(xiāo)售渠道,可以享受市場(chǎng)價(jià)格4-6折優(yōu)惠!如果有意向歡迎電話(huà)聯(lián)系或者加微信:18982081108(備注:SSL證書(shū)合作)期待與您的合作!
package hgs.core.sk;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
//參考如下
//https://community.hortonworks.com/articles/87597/how-to-write-topology-with-the-new-kafka-spout-cli.html
//https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java#L52
public class StormKafkaMainTest {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
//該類(lèi)將傳入的kafka記錄轉(zhuǎn)換為storm的tuple
ByTopicRecordTranslator brt =
new ByTopicRecordTranslator<>( (r) -> new Values(r.value(),r.topic()),new Fields("values","test7"));
//設(shè)置要消費(fèi)的topic即test7
brt.forTopic("test7", (r) -> new Values(r.value(),r.topic()), new Fields("values","test7"));
//類(lèi)似之前的SpoutConfig
KafkaSpoutConfig ksc = KafkaSpoutConfig
//bootstrapServers 以及topic(test7)
.builder("bigdata01:9092,bigdata02:9092,bigdata03:9092", "test7")
//設(shè)置group.id
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "skc-test")
//設(shè)置開(kāi)始消費(fèi)的氣勢(shì)位置
.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)
//設(shè)置提交消費(fèi)邊界的時(shí)長(zhǎng)間隔
.setOffsetCommitPeriodMs(10_000)
//Translator
.setRecordTranslator(brt)
.build();
builder.setSpout("kafkaspout", new KafkaSpout<>(ksc), 2);
builder.setBolt("mybolt1", new MyboltO(), 4).shuffleGrouping("kafkaspout");
Config config = new Config();
config.setNumWorkers(2);
config.setNumAckers(0);
try {
StormSubmitter.submitTopology("storm-kafka-clients", config, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
/* LocalCluster cu = new LocalCluster();
cu.submitTopology("test", config, builder.createTopology());*/
}
}
class MyboltO extends BaseRichBolt{
private static final long serialVersionUID = 1L;
OutputCollector collector = null;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple input) {
//這里把消息大一出來(lái),在對(duì)應(yīng)的woker下面的日志可以找到打印的內(nèi)容
String out = input.getString(0);
System.out.println(out);
//collector.ack(input);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
} pom.xml文件
4.0.0 hgs core.sk 1.0.0-SNAPSHOT jar core.sk http://maven.apache.org UTF-8 junit junit 3.8.1 test org.apache.storm storm-kafka-client 1.1.3 org.apache.storm storm-core 1.1.3 provided org.apache.kafka kafka_2.11 1.0.0 org.slf4j slf4j-log4j12 org.apache.zookeeper zookeeper org.clojure clojure 1.7.0 org.apache.kafka kafka-clients 1.0.0 maven-assembly-plugin 2.2 hgs.core.sk.StormKafkaMainTest jar-with-dependencies make-assembly package single org.apache.maven.plugins maven-compiler-plugin 1.8 1.8
//以下為lambda表達(dá)式,因?yàn)樵谏厦嬗么罅耍栽谶@兒記錄一下,以免以后看不懂
import java.util.UUID;
import org.junit.jupiter.api.Test;
public class TEst {
@Test
public void sysConfig() {
String[] ags = {"his is my first storm program so i hope it will success",
"i love bascketball",
"the day of my birthday i was alone"};
String uuid = UUID.randomUUID().toString();
String nexttuple= ags[new Random().nextInt(ags.length)];
System.out.println(nexttuple);
}
@Test
public void lambdaTest() {
int b = 100;
//該出返回10*a的值、
//"(a) -> 10*a" 相當(dāng)于 new testinter();
printPerson((a) -> 10*a) ;
}
void printPerson( testinter t) {
//穿過(guò)來(lái)的t需要一個(gè)參數(shù)a 即下面借口中定義的方法sysoutitems(int a )
System.out.println(t.sysoutitems(100));
};
}
//定義接口,在lambda表達(dá)式運(yùn)用中,必須為借口,并且借口只能有一個(gè)方法
interface testinter{
T sysoutitems(int a );
//void aAndb(int a, int b );
} 看完上述內(nèi)容,你們掌握storm-kafka-client使用的示例分析的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!