package datadog.trace.instrumentation.pulsar;

import datadog.slf4j.Logger;
import datadog.slf4j.LoggerFactory;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.bootstrap.instrumentation.decorator.BaseDecorator;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;

/* loaded from: input_file:inst/datadog/trace/instrumentation/pulsar/ConsumerDecorator.classdata */
public class ConsumerDecorator extends BaseDecorator {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConsumerDecorator.class);
    public static final CharSequence PULSAR_NAME = UTF8BytesString.create("queue");
    private static final String TOPIC = "topic";
    private static final String LOCAL_SERVICE_NAME = "pulsar";
    private static final String MESSAGING_ID = "messaging.id";
    private static final String MESSAGING_SYSTEM = "messaging.system";

    ConsumerDecorator() {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // datadog.trace.bootstrap.instrumentation.decorator.BaseDecorator
    public String[] instrumentationNames() {
        return new String[]{LOCAL_SERVICE_NAME, "pulsar-client"};
    }

    @Override // datadog.trace.bootstrap.instrumentation.decorator.BaseDecorator
    protected CharSequence spanType() {
        return PULSAR_NAME;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // datadog.trace.bootstrap.instrumentation.decorator.BaseDecorator
    public CharSequence component() {
        return null;
    }

    public static void startAndEnd(PulsarRequest pulsarRequest, Throwable th, String str) {
        if (log.isDebugEnabled()) {
            log.debug("into startAndEnd");
        }
        if (MessageStore.extract(pulsarRequest.getMessage()) != null) {
            return;
        }
        AgentSpan.Context.Extracted extract = AgentTracer.propagate().extract(pulsarRequest, MessageTextMapGetter.GETTER);
        UTF8BytesString create = UTF8BytesString.create(pulsarRequest.getMessage().getTopicName() + " receive");
        AgentSpan startSpan = AgentTracer.startSpan(create, extract);
        startSpan.setResourceName((CharSequence) create);
        startSpan.m1614setTag("topic", pulsarRequest.getMessage().getTopicName());
        startSpan.m1614setTag("destination", pulsarRequest.getDestination());
        startSpan.m1614setTag("broker_url", str);
        startSpan.m1614setTag(MESSAGING_SYSTEM, LOCAL_SERVICE_NAME);
        startSpan.setSpanType(PULSAR_NAME);
        startSpan.setTag(MESSAGING_ID, pulsarRequest.getMessage().getMessageId());
        startSpan.setServiceName(LOCAL_SERVICE_NAME);
        if (th != null) {
            startSpan.setError(true);
            startSpan.setErrorMessage(th.getMessage());
        }
        AgentScope activateSpan = AgentTracer.activateSpan(startSpan);
        activateSpan.span().finish();
        activateSpan.close();
        if (log.isDebugEnabled()) {
            log.debug("out startAndEnd");
        }
        MessageStore.Inject(pulsarRequest.getMessage(), activateSpan);
    }

    public static CompletableFuture<Message<?>> wrap(CompletableFuture<Message<?>> completableFuture, String str) {
        if (log.isDebugEnabled()) {
            log.debug("into wrap");
        }
        CompletableFuture<Message<?>> completableFuture2 = new CompletableFuture<>();
        completableFuture.whenComplete((message, th) -> {
            if (message == null) {
                return;
            }
            startAndEnd(PulsarRequest.create(message), th, str);
            runWithContext(() -> {
                if (th != null) {
                    completableFuture2.completeExceptionally(th);
                } else {
                    completableFuture2.complete(message);
                }
            });
        });
        if (log.isDebugEnabled()) {
            log.debug("out wrap");
        }
        return completableFuture2;
    }

    public static CompletableFuture<Messages<?>> wrapBatch(CompletableFuture<Messages<?>> completableFuture, String str) {
        CompletableFuture<Messages<?>> completableFuture2 = new CompletableFuture<>();
        completableFuture.whenComplete((messages, th) -> {
            if (messages == null) {
                return;
            }
            Iterator it = messages.iterator();
            while (it.hasNext()) {
                Message message = (Message) it.next();
                if (message != null) {
                    startAndEnd(PulsarRequest.create(message), th, str);
                }
            }
            runWithContext(() -> {
                if (th != null) {
                    completableFuture2.completeExceptionally(th);
                } else {
                    completableFuture2.complete(messages);
                }
            });
        });
        return completableFuture2;
    }

    private static void runWithContext(Runnable runnable) {
        runnable.run();
        if (log.isDebugEnabled()) {
            log.debug("out runWithContext");
        }
    }
}
