package com.cloudt.apm.common.kafka;

import com.cloudt.apm.common.config.ApmConfig;
import com.cloudt.apm.common.model.CommonConstant;
import com.cloudt.apm.common.model.Span;
import com.cloudt.apm.common.utils.ApmAgentUtil;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudt/apm/common/kafka/KafkaProducerTool.class */
public class KafkaProducerTool {
    private static final Logger log = LoggerFactory.getLogger(KafkaProducerTool.class);
    private static Producer<String, String> producer = null;
    private static Properties properties = null;

    public static void initKafka(Properties properties2) {
        if (properties == null) {
            properties = new Properties();
            if (properties2.getProperty(ApmConfig.PUSH_MQ_SERVER) == null || properties2.getProperty(ApmConfig.PUSH_MQ_SERVER).length() == 0) {
                throw new RuntimeException("err:kafka_service_url is null");
            }
            properties.put("bootstrap.servers", properties2.getProperty(ApmConfig.PUSH_MQ_SERVER));
            String property = properties2.getProperty(ApmConfig.KafkaConfig.KAFKA_ACKS_CONFIG);
            if (property == null || property.length() == 0) {
                properties.put("acks", "1");
            } else {
                properties.put("acks", property);
            }
            String property2 = properties2.getProperty(ApmConfig.KafkaConfig.KAFKA_RETRIES_CONFIG);
            if (property2 == null || property2.length() == 0) {
                properties.put("retries", 0);
            } else {
                properties.put("retries", property2);
            }
            String property3 = properties2.getProperty(ApmConfig.KafkaConfig.KAFKA_MAX_BLOCK_MS_CONFIG);
            if (property3 == null || property3.length() == 0) {
                System.out.println(properties);
                System.out.println(properties.get("max.block.ms"));
            } else {
                properties.put("max.block.ms", property3);
            }
            properties.put("retry.backoff.ms", 100);
            properties.put("buffer.memory", 33554432);
            properties.put("batch.size", 16384);
            String property4 = properties2.getProperty(ApmConfig.KafkaConfig.KAFKA_LINGER_MS_CONFIG);
            if (property4 == null || property4.length() == 0) {
                properties.put("linger.ms", 10);
            } else {
                properties.put("linger.ms", property4);
            }
            properties.put("key.serializer", StringSerializer.class.getName());
            properties.put("value.serializer", StringSerializer.class.getName());
        }
        if (producer == null) {
            producer = new KafkaProducer(properties);
        }
    }

    public static boolean sendCollectInfoMessage(final Span span, Properties properties2) {
        try {
            if (producer == null) {
                initKafka(properties2);
            }
            String objectToJson = ApmAgentUtil.objectToJson(ApmAgentUtil.apmDataModelToMap(span));
            String property = properties2.getProperty(ApmConfig.PUSH_MQ_TOPIC);
            producer.send(new ProducerRecord((property == null || property.length() == 0) ? CommonConstant.CLOUDT_AGENT_TOPIC : property, 0, CommonConstant.CONTROLLER_TOPIC_KEY, objectToJson), new Callback() { // from class: com.cloudt.apm.common.kafka.KafkaProducerTool.1
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                    if (exc != null) {
                        KafkaProducerTool.log.error("异步方式发送消息失败：" + exc.getMessage(), exc);
                        return;
                    }
                    if (recordMetadata != null) {
                        Logger logger = KafkaProducerTool.log;
                        String traceId = Span.this.getTrace().getTraceId();
                        String str = recordMetadata.topic();
                        long offset = recordMetadata.offset();
                        boolean hasOffset = recordMetadata.hasOffset();
                        recordMetadata.partition();
                        logger.info("异步方式发送消息结果：" + traceId + "-" + str + "-" + offset + "-" + logger + "-" + hasOffset);
                    }
                }
            });
            return true;
        } catch (Exception e) {
            log.error("kafka消息异常：" + e.getMessage(), e);
            return false;
        }
    }
}
