public class ConsumerTest { public static volatile boolean isRunning = true; public static KafkaConsumer consumer; public static HashMap> eventDateMap = new HashMap<>(); public static ArrayList userList = new ArrayList<>(); public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "All03"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10000"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumer = new KafkaConsumer<>(props); String topic = "hndj"; consumer.subscribe(Collections.singletonList(topic)); try { while (isRunning) { ConsumerRecords records = consumer.poll(Duration.ofSeconds(10)); if (!records.isEmpty()) { for (ConsumerRecord record : records) { String value = record.value(); JSonObject jsonObject = JSONObject.parseObject(value); String data = jsonObject.getString("data"); if (!StringUtils.isEmpty(data) && data.startsWith("[")) { JSonArray objects = JSONArray.parseArray(data); for (int i = 0; i < objects.size(); i++) { JSonObject object = objects.getJSonObject(i); String type = object.getString("type"); if (type.contains("track")) { // 转换日期 Long time = object.getLong("time"); String date = new SimpleDateFormat("yyyy-MM-dd").format(new Date(time)); String eventName = object.getString("event"); HashMap eventMap = eventDateMap.get(date); if (eventDateMap.containsKey(date)) { if (eventMap != null && eventMap.containsKey(eventName)) { Integer ad = eventMap.get(eventName); ad++; eventMap.put(eventName, ad); } else { assert eventMap != null; eventMap.put(eventName, 1); } } else { eventMap = new HashMap<>(4); eventMap.put(eventName, 1); eventDateMap.put(date, eventMap); } } else { String userId = object.getString("distinct_id"); if (!StringUtils.isEmpty(userId)){ userList.add(userId); } } object = null; } } } } else { close(); } } } finally { close(); } // 计算日期下每个事件的总数 if (eventDateMap != null) { for (Map.Entry> mapEntry : eventDateMap.entrySet()) { HashMap map = mapEntry.getValue(); if (map != null) { for (Map.Entry entry : map.entrySet()) { log.info("日期:{}--eventName:{}--eventCount:{}", mapEntry.getKey(), entry.getKey(), entry.getValue()); } } } } // 计算日期对应的总数 if (eventDateMap != null) { for (Map.Entry> mapEntry : eventDateMap.entrySet()) { Integer eventCount = 0; HashMap map = mapEntry.getValue(); if (map != null) { for (Map.Entry entry : map.entrySet()) { eventCount += entry.getValue(); } log.info("日期:{}--event总数:{}", mapEntry.getKey(), eventCount); } } } log.info("user数据总数:{}", userList.size()); } private static void close() { isRunning = false; if (consumer != null) { consumer.close(); } }}