Instrumentation

This section of the documentation is targeted to users who want to add instrumentation to their codebase.

In this section, we see some common examples of reusing existing Micrometer and Micrometer Tracing handlers and context types to do instrumentation.

Before you decide to instrument a project yourself, double-check whether that it has not already been instrumented!

To better convey how you can do instrumentation, we need to distinguish two concepts:

  • Context propagation

  • Creation of Observations

Context propagation - We propagate existing context through threads or network. We use the Micrometer Context Propagation library to define the context and to propagate it through threads. We use dedicated SenderContext and ReceiverContext objects, together with Micrometer Tracing handlers, to create Observations that propagate context over the wire.

Creation of Observations - We want to wrap an operation in an Observation to get measurements. We need to know if there previously has been a parent Observation to maintain the parent-child relationship of Observations.

Instrumentation of HTTP Communication

In this section you can find how to instrument libraries that do HTTP communication.

Instrumentation of HTTP Client Communication

Explanation of HTTP client side instrumentation

┌─────────────────────────────┐┌───────────┐┌───────────────────────┐┌───────────────┐┌─────────────────────────┐┌──────────────────────┐┌──────────────────────┐
│[1] RequestReplySenderContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└──────────────┬──────────────┘└─────┬─────┘└───────────┬───────────┘└───────┬───────┘└────────────┬────────────┘└──────────┬───────────┘└──────────┬───────────┘
               │                     │                  │                    │                     │                        │                       │
               │        Wrap         │                  │                    │                     │                        │                       │
               │────────────────────>│                  │                    │                     │                        │                       │
               │                     │                  │                    │                     │                        │                       │
               │                     │                  │       Create       │                     │                        │                       │
               │                     │                  │───────────────────>│                     │                        │                       │
               │                     │                  │                    │                     │                        │                       │
               │                     │     Create       │                    │                     │                        │                       │
               │────────────────────────────────────────────────────────────>│                     │                        │                       │
               │                     │                  │                    │                     │                        │                       │
               │                     │                  │                    │       Create        │                        │                       │
               │                     │                  │                    │<────────────────────│                        │                       │
               │                     │                  │                    │                     │                        │                       │
               │                     │                  │                    │                   onStart                    │                       │
               │                     │                  │                    │─────────────────────────────────────────────>│                       │
               │                     │                  │                    │                     │                        │                       │
               │                     │                  │                    │                     │      Wrap in Scope     │                       │
               │                     │                  │                    │─────────────────────────────────────────────────────────────────────>│
┌──────────────┴──────────────┐┌─────┴─────┐┌───────────┴───────────┐┌───────┴───────┐┌────────────┴────────────┐┌──────────┴───────────┐┌──────────┴───────────┐
│[1] RequestReplySenderContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└─────────────────────────────┘└───────────┘└───────────────────────┘└───────────────┘└─────────────────────────┘└──────────────────────┘└──────────────────────┘
  • In the <3> ObservationRegistry register a <6> handler that will propagate context (e.g. PropagatingSenderTracingObservationHandler from Micrometer Tracing)

  • Create a <1> RequestReplySenderContext that wraps a <2> carrier (e.g. HttpRequest)

    • In its constructor explain how to enrich the headers (e.g. (key, value) → httpRequest.header(key, value))

    • Set the <2> carrier on the <1> RequestReplySenderContext

  • Create an <4> Observation, optionally using the <5> ObservationConvention with the sender context

    • On <4> Observation start, propagation will happen (e.g. carrier will be enriched with proper headers) via an <6> ObservationHandler

  • Wrap the <7> code to instrument (e.g. sending of an HTTP request) in scope (e.g. through the observe or scoped method)

Instrumentation of HTTP Server Communication

Explanation of HTTP server side instrumentation

┌───────────────────────────────┐┌───────────┐┌───────────────────────┐┌───────────────┐┌─────────────────────────┐┌──────────────────────┐┌──────────────────────┐
│[1] RequestReplyReceiverContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└───────────────┬───────────────┘└─────┬─────┘└───────────┬───────────┘└───────┬───────┘└────────────┬────────────┘└──────────┬───────────┘└──────────┬───────────┘
                │                      │                  │                    │                     │                        │                       │
                │         Wrap         │                  │                    │                     │                        │                       │
                │─────────────────────>│                  │                    │                     │                        │                       │
                │                      │                  │                    │                     │                        │                       │
                │                      │                  │       Create       │                     │                        │                       │
                │                      │                  │───────────────────>│                     │                        │                       │
                │                      │                  │                    │                     │                        │                       │
                │                      │     Create       │                    │                     │                        │                       │
                │─────────────────────────────────────────────────────────────>│                     │                        │                       │
                │                      │                  │                    │                     │                        │                       │
                │                      │                  │                    │       Create        │                        │                       │
                │                      │                  │                    │<────────────────────│                        │                       │
                │                      │                  │                    │                     │                        │                       │
                │                      │                  │                    │                   onStart                    │                       │
                │                      │                  │                    │─────────────────────────────────────────────>│                       │
                │                      │                  │                    │                     │                        │                       │
                │                      │                  │                    │                     │      Wrap in Scope     │                       │
                │                      │                  │                    │─────────────────────────────────────────────────────────────────────>│
┌───────────────┴───────────────┐┌─────┴─────┐┌───────────┴───────────┐┌───────┴───────┐┌────────────┴────────────┐┌──────────┴───────────┐┌──────────┴───────────┐
│[1] RequestReplyReceiverContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└───────────────────────────────┘└───────────┘└───────────────────────┘└───────────────┘└─────────────────────────┘└──────────────────────┘└──────────────────────┘
  • In the ObservationRegistry register a handler that will propagate context (e.g. PropagatingReceiverTracingObservationHandler from Micrometer Tracing)

  • Create a <1> RequestReplyReceiverContext that wraps a <2> carrier (e.g. HttpRequest)

    • In its constructor explain how to retrieve the header values (e.g. (carrier, key) → carrier.header(key))

    • Set the <2> carrier on the <1> RequestReplyReceiverContext

  • Create an <4> Observation, optionally using the <5> ObservationConvention with the sender context

    • On <4> Observation start, propagation will happen (e.g. carrier will be enriched with proper headers) via an <6> ObservationHandler

  • Wrap the <6> code to instrument (e.g. processing of an HTTP request) in scope (e.g. through the observe or scoped method)

Instrumentation of HTTP Communication Example

To instrument an HTTP-based communication, we need to use the RequestReplySenderContext and RequestReplyReceiverContext for the client and server side, respectively.

As an example for the client side, we use a handler that instruments the HTTP request by adding a foo:bar header (if you have Micrometer Tracing on the classpath, you could reuse the PropagatingSenderTracingObservationHandler and PropagatingReceiverTracingObservationHandler to propagate tracing context over the wire). Let’s consider an example of such a handler:

static class HeaderPropagatingHandler implements ObservationHandler<SenderContext<Object>> {

    @Override
    public void onStart(SenderContext<Object> context) {
        context.getSetter().set(context.getCarrier(), "foo", "bar");
    }

    @Override
    public boolean supportsContext(Observation.Context context) {
        return context instanceof SenderContext;
    }

}

Consider the following HTTP client side instrumentation that reuses the handler:

// This example can be combined with the idea of ObservationConvention to allow
// users to easily customize the key values. Please read the rest of the
// documentation on how to do it.

// In Micrometer Tracing we would have predefined
// PropagatingSenderTracingObservationHandler but for the sake of this demo we
// create our own handler that puts "foo":"bar" headers into the request
registry.observationConfig().observationHandler(new HeaderPropagatingHandler());

// We're using WireMock to stub the HTTP GET call to "/foo" with a response "OK"
stubFor(get("/foo").willReturn(ok().withBody("OK")));

// RequestReplySenderContext is a special type of context used for request-reply
// communication. It requires to define what the Request type is and how we can
// instrument it. It also needs to know what the Response type is
RequestReplySenderContext<HttpUriRequestBase, ClassicHttpResponse> context = new RequestReplySenderContext<>(
        (carrier, key, value) -> Objects.requireNonNull(carrier).addHeader(key, value));

// We're instrumenting the Apache HTTPClient
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
    // The HttpGet is our carrier (we can mutate it to instrument the headers)
    HttpGet httpget = new HttpGet(info.getHttpBaseUrl() + "/foo");
    // We must set the carrier BEFORE we run <Observation#start>
    context.setCarrier(httpget);
    // You can set the remote service address to provide more debugging
    // information
    context.setRemoteServiceAddress(info.getHttpBaseUrl());
    // Examples of setting key values from the request
    Observation observation = Observation.createNotStarted("http.client.requests", () -> context, registry)
        .contextualName("HTTP " + httpget.getMethod())
        .lowCardinalityKeyValue("http.url", info.getHttpBaseUrl() + "/{name}")
        .highCardinalityKeyValue("http.full-url", httpget.getRequestUri());
    observation.observeChecked(() -> {
        String response = httpclient.execute(httpget, classicHttpResponse -> {
            // We should set the response before we stop the observation
            context.setResponse(classicHttpResponse);
            // Example of setting key values from the response
            observation.highCardinalityKeyValue("http.content.length",
                    String.valueOf(classicHttpResponse.getEntity().getContentLength()));
            return EntityUtils.toString(classicHttpResponse.getEntity());
        });

        then(response).isEqualTo("OK");
    });
}

// We want to be sure that we have successfully enriched the HTTP headers
verify(getRequestedFor(urlEqualTo("/foo")).withHeader("foo", equalTo("bar")));

As an example for the server side, we use a handler that instruments the Observation by adding the foo low cardinality key with the value being the matched path from the HTTP request. Consider an example of such a handler:

static class HeaderReadingHandler implements ObservationHandler<ReceiverContext<Context>> {

    @Override
    public void onStart(ReceiverContext<Context> context) {
        String fooHeader = context.getGetter().get(context.getCarrier(), "foo");
        // We're setting the value of the <foo> header as a low cardinality key value
        context.addLowCardinalityKeyValue(KeyValue.of("foo", fooHeader));
    }

    @Override
    public boolean supportsContext(Observation.Context context) {
        return context instanceof ReceiverContext;
    }

}

Consider the following HTTP server side instrumentation that reuses the handler:

// This example can be combined with the idea of ObservationConvention to allow
// users to easily customize the key values. Please read the rest of the
// documentation on how to do it.

// In Micrometer Tracing we would have predefined
// PropagatingReceiverTracingObservationHandler but for the sake of this demo we
// create our own handler that will reuse the <foo> header from the request as a
// low cardinality key value
registry.observationConfig().observationHandler(new HeaderReadingHandler());

try (Javalin javalin = Javalin.create().before("/hello/{name}", ctx -> {
    // We're creating the special RequestReplyReceiverContext that will reuse the
    // information from the HTTP headers
    RequestReplyReceiverContext<Context, Context> receiverContext = new RequestReplyReceiverContext<>(
            Context::header);
    // Remember to set the carrier!!!
    receiverContext.setCarrier(ctx);
    String remoteServiceAddress = ctx.scheme() + "://" + ctx.host();
    receiverContext.setRemoteServiceAddress(remoteServiceAddress);
    // We're starting an Observation with the context
    Observation observation = Observation
        .createNotStarted("http.server.requests", () -> receiverContext, registry)
        .contextualName("HTTP " + ctx.method() + " " + ctx.matchedPath())
        .lowCardinalityKeyValue("http.url", remoteServiceAddress + ctx.matchedPath())
        .highCardinalityKeyValue("http.full-url", remoteServiceAddress + ctx.path())
        .lowCardinalityKeyValue("http.method", ctx.method().name())
        .start();
    // Let's be consistent and always set the Observation related objects under
    // the same key
    ctx.attribute(ObservationThreadLocalAccessor.KEY, observation);
}).get("/hello/{name}", ctx -> {
    // We need to be thread-safe - we're not using ThreadLocals, we're retrieving
    // information from the attributes
    Observation observation = ctx.attribute(ObservationThreadLocalAccessor.KEY);
    observation.scoped(() -> {
        // If we need thread locals (e.g. MDC entries) we can use <scoped()>
        log.info("We're using scoped - Observation in thread local here [" + registry.getCurrentObservation()
                + "]");
        then(registry.getCurrentObservation()).isNotNull();
    });
    // We're returning body
    ctx.result("Hello World [" + observation.getContext().getLowCardinalityKeyValue("foo").getValue() + "]");
}).after("/hello/{name}", ctx -> {
    // After sending the response we want to stop the Observation
    Observation observation = ctx.attribute(ObservationThreadLocalAccessor.KEY);
    observation.stop();
}).start(0)) {
    // We're sending an HTTP request with a <foo:bar> header. We're expecting that
    // it will be reused in the response
    String response = sendRequestToHelloEndpointWithHeader(javalin.port(), "foo", "bar");

    // The response must contain the value from the header
    then(response).isEqualTo("Hello World [bar]");
}

Instrumentation of Messaging Communication

In this section you can find how to instrument libraries that do fire-and-forget like communication.

Instrumentation of Messaging Producer Side

Explanation of messaging producer side instrumentation

┌─────────────────┐┌───────────┐┌───────────────────────┐┌───────────────┐┌─────────────────────────┐┌──────────────────────┐┌──────────────────────┐
│[1] SenderContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└────────┬────────┘└─────┬─────┘└───────────┬───────────┘└───────┬───────┘└────────────┬────────────┘└──────────┬───────────┘└──────────┬───────────┘
         │               │                  │                    │                     │                        │                       │
         │     Wrap      │                  │                    │                     │                        │                       │
         │──────────────>│                  │                    │                     │                        │                       │
         │               │                  │                    │                     │                        │                       │
         │               │                  │       Create       │                     │                        │                       │
         │               │                  │───────────────────>│                     │                        │                       │
         │               │                  │                    │                     │                        │                       │
         │               │        Create    │                    │                     │                        │                       │
         │──────────────────────────────────────────────────────>│                     │                        │                       │
         │               │                  │                    │                     │                        │                       │
         │               │                  │                    │       Create        │                        │                       │
         │               │                  │                    │<────────────────────│                        │                       │
         │               │                  │                    │                     │                        │                       │
         │               │                  │                    │                   onStart                    │                       │
         │               │                  │                    │─────────────────────────────────────────────>│                       │
         │               │                  │                    │                     │                        │                       │
         │               │                  │                    │                     │      Wrap in Scope     │                       │
         │               │                  │                    │─────────────────────────────────────────────────────────────────────>│
┌────────┴────────┐┌─────┴─────┐┌───────────┴───────────┐┌───────┴───────┐┌────────────┴────────────┐┌──────────┴───────────┐┌──────────┴───────────┐
│[1] SenderContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└─────────────────┘└───────────┘└───────────────────────┘└───────────────┘└─────────────────────────┘└──────────────────────┘└──────────────────────┘
  • In the <3> ObservationRegistry register a <6> handler that will propagate context (e.g. PropagatingReceiverTracingObservationHandler from Micrometer Tracing)

  • Create a <1> SenderContext that wraps a <2> carrier (e.g. AmqpMessage)

    • In its constructor explain how to enrich the headers (e.g. (key, value) → amqpMessage.header(key, value))

    • Set the <2> carrier on the <1> SenderContext

  • Create an <4> Observation, optionally using the <5> ObservationConvention with the sender context

    • On <4> Observation start, propagation will happen (e.g. carrier will be enriched with proper headers) via an <6> ObservationHandler

  • Wrap the <7> code to instrument (e.g. sending of an AMQP message) in scope (e.g. through the observe or scoped method)

Instrumentation of Messaging Consumer Side Communication

Explanation of messaging consumer side instrumentation

┌───────────────────┐┌───────────┐┌───────────────────────┐┌───────────────┐┌─────────────────────────┐┌──────────────────────┐┌──────────────────────┐
│[1] ReceiverContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└─────────┬─────────┘└─────┬─────┘└───────────┬───────────┘└───────┬───────┘└────────────┬────────────┘└──────────┬───────────┘└──────────┬───────────┘
          │                │                  │                    │                     │                        │                       │
          │      Wrap      │                  │                    │                     │                        │                       │
          │───────────────>│                  │                    │                     │                        │                       │
          │                │                  │                    │                     │                        │                       │
          │                │                  │       Create       │                     │                        │                       │
          │                │                  │───────────────────>│                     │                        │                       │
          │                │                  │                    │                     │                        │                       │
          │                │        Create    │                    │                     │                        │                       │
          │───────────────────────────────────────────────────────>│                     │                        │                       │
          │                │                  │                    │                     │                        │                       │
          │                │                  │                    │       Create        │                        │                       │
          │                │                  │                    │<────────────────────│                        │                       │
          │                │                  │                    │                     │                        │                       │
          │                │                  │                    │                   onStart                    │                       │
          │                │                  │                    │─────────────────────────────────────────────>│                       │
          │                │                  │                    │                     │                        │                       │
          │                │                  │                    │                     │      Wrap in Scope     │                       │
          │                │                  │                    │─────────────────────────────────────────────────────────────────────>│
┌─────────┴─────────┐┌─────┴─────┐┌───────────┴───────────┐┌───────┴───────┐┌────────────┴────────────┐┌──────────┴───────────┐┌──────────┴───────────┐
│[1] ReceiverContext││[2] Carrier││[3] ObservationRegistry││[4] Observation││[5] ObservationConvention││[6] ObservationHandler││[7] Code to Instrument│
└───────────────────┘└───────────┘└───────────────────────┘└───────────────┘└─────────────────────────┘└──────────────────────┘└──────────────────────┘
  • In the <3> ObservationRegistry register a <6> handler that will propagate context (e.g. PropagatingReceiverTracingObservationHandler from Micrometer Tracing)

  • Create a <1> ReceiverContext that wraps a <2> carrier (e.g. AmqpMessage)

    • In its constructor explain how to retrieve the header values (e.g. (carrier, key) → carrier.header(key))

    • Set the <2> carrier on the <1> ReceiverContext

  • Create an <4> Observation, optionally using the <6> ObservationConvention with the sender context

    • On <4> Observation start, propagation will happen (e.g. carrier will be enriched with proper headers) via an <6> ObservationHandler

  • Wrap the <7> code to instrument (e.g. processing of an HTTP request) in scope (e.g. through the observe or scoped method)

    • Some libraries (e.g. RabbitMQ) you might not have a handle on user’s code, and you may require the user to allow starting a consumer side Observation and opening its scope by the framework (putting values in thread local) with the requirement of manually closing both the scope and stopping the Observation later by the user in their code!

Instrumentation of Messaging Communication Example

To instrument messaging-based communication we need to use the SenderContext and ReceiverContext for the producer and consumer side respectively.

In this section we will create a simple instrumentation for Apache Kafka.

As an example for the producer side we will use a handler that instruments the message by adding a foo:bar header (if you have Micrometer Tracing on the classpath you could reuse the PropagatingSenderTracingObservationHandler and PropagatingReceiverTracingObservationHandler to propagate tracing context over the wire). Consider the following example of the KafkaSenderContext:

static class KafkaSenderContext extends SenderContext<ProducerRecord<String, String>> {

    public KafkaSenderContext(ProducerRecord<String, String> producerRecord) {
        // We describe how the carrier will be mutated (we mutate headers)
        super((carrier, key, value) -> carrier.headers().add(key, value.getBytes(StandardCharsets.UTF_8)));
        setCarrier(producerRecord);
    }

}

Consider the following example of the aforementioned handler:

static class HeaderPropagatingHandler implements ObservationHandler<KafkaSenderContext> {

    @Override
    public void onStart(KafkaSenderContext context) {
        context.getSetter().set(context.getCarrier(), "foo", "bar");
        context.addLowCardinalityKeyValue(KeyValue.of("sent", "true"));
    }

    @Override
    public boolean supportsContext(Observation.Context context) {
        return context instanceof KafkaSenderContext;
    }

}

Consider the following code that is the ProducerInterceptor for Kafka:

public class ProducerInterceptorConfig implements ProducerInterceptor<String, String> {

    private ObservationRegistry observationRegistry;

    private Observation observation;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        // This code will be called before the message gets sent. We create
        // a context and pass it to an Observation. Upon start, the handler will be called
        // and the ProducerRecord will be mutated
        KafkaSenderContext context = new KafkaSenderContext(record);
        this.observation = Observation.start("kafka.send", () -> context, observationRegistry);
        // We return the mutated carrier
        return context.getCarrier();
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        // Once the message got sent (with or without an exception) we attach an exception
        // and stop the observation
        this.observation.error(exception);
        this.observation.stop();
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {
        // We retrieve the ObservationRegistry from the configuration
        this.observationRegistry = (ObservationRegistry) configs.get(ObservationRegistry.class.getName());
    }

}

Consider the following code of the producer side instrumentation that reuses the handler:

TestObservationRegistry registry = TestObservationRegistry.create();

// In Micrometer Tracing we would have predefined
// PropagatingSenderTracingObservationHandler but for the sake of this demo we
// create our own handler that puts "foo":"bar" headers into the request and will
// set the low cardinality key "sent" to "true".
registry.observationConfig().observationHandler(new HeaderPropagatingHandler());

// Producer side...
Properties producerConfigs = new Properties();
producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
producerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
producerConfigs.put(ObservationRegistry.class.getName(), registry);
producerConfigs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        "io.micrometer.docs.observation.messaging.ProducerInterceptorConfig");
Producer<String, String> producer = new KafkaProducer<>(producerConfigs, new StringSerializer(),
        new StringSerializer());

// Producer sends a message
producer.send(new ProducerRecord<>(topic, "foo"));
producer.flush();

As an example for the consumer side, we use a handler that instruments the Observation by adding the foo low cardinality key with the value being the matched path from the message. Consider the following example of the KafkaReceiverContext:

static class KafkaReceiverContext extends ReceiverContext<ConsumerRecords<String, String>> {

    public KafkaReceiverContext(ConsumerRecords<String, String> consumerRecord) {
        // We describe how to read entries from the carrier (we read headers)
        super((carrier, key) -> {
            // This is a very naive approach that takes the first ConsumerRecord
            Header header = carrier.iterator().next().headers().lastHeader(key);
            if (header != null) {
                return new String(header.value());
            }
            return null;
        });
        setCarrier(consumerRecord);
    }

}

Consider the following example of the aforementioned handler.

static class HeaderReadingHandler implements ObservationHandler<KafkaReceiverContext> {

    @Override
    public void onStart(KafkaReceiverContext context) {
        String fooHeader = context.getGetter().get(context.getCarrier(), "foo");
        // We're setting the value of the <foo> header as a low cardinality key value
        context.addLowCardinalityKeyValue(KeyValue.of("received foo header", fooHeader));
    }

    @Override
    public boolean supportsContext(Observation.Context context) {
        return context instanceof KafkaReceiverContext;
    }

}

Consider the following code that is the ConsumerInterceptor for Kafka:

public class ConsumerInterceptorConfig implements ConsumerInterceptor<String, String> {

    private ObservationRegistry observationRegistry;

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        // We're creating the receiver context
        KafkaReceiverContext context = new KafkaReceiverContext(records);
        // Then, we're just starting and stopping the observation on the consumer side
        Observation.start("kafka.receive", () -> context, observationRegistry).stop();
        // We could put the Observation in scope so that the users can propagate it
        // further on
        return context.getCarrier();
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {
        // We retrieve the ObservationRegistry from the configuration
        this.observationRegistry = (ObservationRegistry) configs.get(ObservationRegistry.class.getName());
    }

}

Consider the following code of the consumer side instrumentation that reuses the handler:

TestObservationRegistry registry = TestObservationRegistry.create();

// Consumer side...
// In Micrometer Tracing we would have predefined
// PropagatingReceiverTracingObservationHandler but for the sake of this demo we
// create our own handler that takes the "foo" header's value and sets it as a low
// cardinality key "received foo header"
registry.observationConfig().observationHandler(new HeaderReadingHandler());

Properties consumerConfigs = new Properties();
consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "test-" + UUID.randomUUID());
consumerConfigs.put(ObservationRegistry.class.getName(), registry);
consumerConfigs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        "io.micrometer.docs.observation.messaging.ConsumerInterceptorConfig");
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigs, new StringDeserializer(),
        new StringDeserializer());

// Consumer scubscribes to the topic
consumer.subscribe(Collections.singletonList(topic));

// Consumer polls for a message
consumer.poll(Duration.ofMillis(1000));

Let’s look at the assertions after having sent and received a message. We should have 2 observations, 1 on the consumer and 1 on the producer side, each with proper key values as shown in the handlers.

TestObservationRegistryAssert.assertThat(registry)
    .hasObservationWithNameEqualTo("kafka.send")
    .that()
    .hasBeenStarted()
    .hasBeenStopped()
    .hasLowCardinalityKeyValue("sent", "true");

TestObservationRegistryAssert.assertThat(registry)
    .hasObservationWithNameEqualTo("kafka.receive")
    .that()
    .hasBeenStarted()
    .hasBeenStopped()
    .hasLowCardinalityKeyValue("received foo header", "bar");

Instrumentation of Thread Switching Components

We might want to create an Observation around a Runnable or Callable that we’re submitting through an Executor. For that to work, we need to know if there was an Observation in the parent thread that the new thread should continue or for which a child Observation should be created.

Consider the following example:

// Example of an Executor Service
ExecutorService executor = Executors.newCachedThreadPool();


// This snippet shows an example of how to wrap in an observation code that would
// be executed in a separate thread

// Let's assume that we have a parent observation
Observation parent = Observation.createNotStarted("parent", registry);
// Observation is put in scope via the <observe()> method
Future<Boolean> child = parent.observe(() -> {
    // [Thread 1] Current Observation is the same as <parent>
    then(registry.getCurrentObservation()).isSameAs(parent);
    // [Thread 1] We're wrapping the executor in a Context Propagating version.
    // <ContextExecutorService> comes from Context Propagation library
    return ContextExecutorService.wrap(executor).submit(() -> {
        // [Thread 2] Current Observation is same as <parent> - context got
        // propagated
        then(registry.getCurrentObservation()).isSameAs(parent);
        // Wraps the code that should be run in a separate thread in an
        // observation
        return Observation.createNotStarted("child", registry).observe(this::yourCodeToMeasure);
    });
});

Instrumentation of Reactive Libraries

In this section, we discuss how to wrap Reactive libraries in Observations and how to use Reactor Context to safely propagate Observations between threads.

For Reactor 3.5.3 and After

In the Reactor 3.5.3 release (through this PR), an option to turn on automated context propagation was added. To use this, ensure that you use the following projects at minimum in the following versions:

To use the feature, call the new Reactor’s Hook method (for example, in your public static void main method), like this:

Hooks.enableAutomaticContextPropagation();

This automatically wraps Reactor’s internal mechanisms to propagate context between operators, threads, and so on. Usage of tap and handle or the Context Propagation API is not required.

Consider the following example:

// This snippet shows an example of how to use the new Hook API with Reactor
Hooks.enableAutomaticContextPropagation();
// Starting from Micrometer 1.10.8 you need to set your registry on this singleton
// instance of OTLA
ObservationThreadLocalAccessor.getInstance().setObservationRegistry(registry);

// Let's assume that we have a parent observation
Observation parent = Observation.start("parent", registry);
// Now we put it in thread local
parent.scoped(() -> {

    // Example of propagating whatever there was in thread local
    Integer block = Mono.just(1).publishOn(Schedulers.boundedElastic()).doOnNext(integer -> {
        log.info("Context Propagation happens - the <parent> observation gets propagated ["
                + registry.getCurrentObservation() + "]");
        then(registry.getCurrentObservation()).isSameAs(parent);
    })
        .flatMap(integer -> Mono.just(integer).map(monoInteger -> monoInteger + 1))
        .transformDeferredContextual((integerMono, contextView) -> integerMono.doOnNext(integer -> {
            log.info("Context Propagation happens - the <parent> observation gets propagated ["
                    + registry.getCurrentObservation() + "]");
            then(registry.getCurrentObservation()).isSameAs(parent);
        }))
        // Let's assume that we're modifying the context
        .contextWrite(context -> context.put("foo", "bar"))
        // Since we are NOT part of the Reactive Chain (e.g. this is not a
        // WebFlux application)
        // you MUST call <contextCapture> to capture all ThreadLocal values
        // and store them in a Reactor Context.
        // ----------------------
        // If you were part of the
        // Reactive Chain (e.g. returning Mono from endpoint)
        // there is NO NEED to call <contextCapture>. If you need to propagate
        // your e.g. Observation
        // to the Publisher you just created (e.g. Mono or Flux) please
        // consider adding it
        // to the Reactor Context directly instead of opening an Observation
        // scope and calling <contextCapture> (see example below).
        .contextCapture()
        .block();

    // We're still using <parent> as current observation
    then(registry.getCurrentObservation()).isSameAs(parent);

    then(block).isEqualTo(2);

    // Now, we want to create a child observation for a Reactor stream and put it
    // to Reactor Context
    // Automatically its parent will be <parent> observation since <parent> is in
    // Thread Local
    Observation child = Observation.start("child", registry);
    block = Mono.just(1).publishOn(Schedulers.boundedElastic()).doOnNext(integer -> {
        log.info(
                "Context Propagation happens - the <child> observation from Reactor Context takes precedence over thread local <parent> observation ["
                        + registry.getCurrentObservation() + "]");
        then(registry.getCurrentObservation()).isSameAs(child);
    })
        .flatMap(integer -> Mono.just(integer).map(monoInteger -> monoInteger + 1))
        .transformDeferredContextual((integerMono, contextView) -> integerMono.doOnNext(integer -> {
            log.info(
                    "Context Propagation happens - the <child> observation from Reactor Context takes precedence over thread local <parent> observation ["
                            + registry.getCurrentObservation() + "]");
            then(registry.getCurrentObservation()).isSameAs(child);
        }))
        // Remember to stop the child Observation!
        .doFinally(signalType -> child.stop())
        // When using Reactor we ALWAYS search for
        // ObservationThreadLocalAccessor.KEY entry in the Reactor Context to
        // search for an Observation. You DON'T have to use <contextCapture>
        // because
        // you have manually provided the ThreadLocalAccessor key
        .contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, child))
        .block();

    // We're back to having <parent> as current observation
    then(registry.getCurrentObservation()).isSameAs(parent);

    then(block).isEqualTo(2);
});

// There should be no remaining observation
then(registry.getCurrentObservation()).isNull();

// We need to stop the parent
parent.stop();

If the performance of this approach is not satisfactory, check whether disabling the hook and explicitly using handle or tap operators improves the performance.

Before Reactor 3.5.3

The preferred way of propagating elements through the Flux by using Reactor is not through ThreadLocal instances but through Reactor Context. Reactor, however, gives you two operators: tap() and handle(). With these two operators, if the Micrometer Context Propagation library is on the classpath, it sets the thread local values for you.

Consider the following example:

// This snippet shows an example of how to wrap code that is using Reactor

// Let's assume that we have a parent observation
Observation parent = Observation.start("parent", registry);

// We want to create a child observation for a Reactor stream
Observation child = Observation.start("child", registry)
    // There's no thread local entry, so we will pass parent observation
    // manually. If we put the Observation in scope we could then call
    // <.contextCapture()> method from Reactor to capture all thread locals
    // and store them in Reactor Context.
    .parentObservation(parent);
Integer block = Mono.just(1)
    // Example of not propagating context by default
    .doOnNext(integer -> {
        log.info(
                "No context propagation happens by default in Reactor - there will be no Observation in thread local here ["
                        + registry.getCurrentObservation() + "]");
        then(registry.getCurrentObservation()).isNull();
    })
    // Example of having entries in thread local for <tap()> operator
    .tap(() -> new DefaultSignalListener<Integer>() {
        @Override
        public void doFirst() throws Throwable {
            log.info("We're using tap() -> there will be Observation in thread local here ["
                    + registry.getCurrentObservation() + "]");
            then(registry.getCurrentObservation()).isNotNull();
        }
    })
    .flatMap(integer -> Mono.just(integer).map(monoInteger -> monoInteger + 1))
    // Example of retrieving ThreadLocal entries via ReactorContext
    .transformDeferredContextual((integerMono, contextView) -> integerMono.doOnNext(integer -> {
        try (ContextSnapshot.Scope scope = ContextSnapshot.setAllThreadLocalsFrom(contextView)) {
            log.info(
                    "We're retrieving thread locals from Reactor Context - there will be Observation in thread local here ["
                            + registry.getCurrentObservation() + "]");
            then(registry.getCurrentObservation()).isNotNull();
        }
    }))
    // Example of having entries in thread local for <handle()> operator
    .handle((BiConsumer<Integer, SynchronousSink<Integer>>) (integer, synchronousSink) -> {
        log.info("We're using handle() -> There will be Observation in thread local here ["
                + registry.getCurrentObservation() + "]");
        then(registry.getCurrentObservation()).isNotNull();
        synchronousSink.next(integer);
    })
    // Remember to stop the child Observation!
    .doFinally(signalType -> child.stop())
    // When using Reactor we ALWAYS search for
    // ObservationThreadLocalAccessor.KEY entry in the Reactor Context to
    // search for an Observation
    .contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, child))
    // If there were ThreadLocal entries that are using Micrometer Context
    // Propagation they would be caught here. All implementations of
    // <ThreadLocalAccessor> will store their thread local entries under their
    // keys in Reactor Context
    .contextCapture()
    .block();

// We didn't have any observations in thread local
then(registry.getCurrentObservation()).isNull();

// We need to stop the parent
parent.stop();

then(block).isEqualTo(2);