The Dapr SDK for Java: Pub/Sub & Distributed Tracing

· by Maarten Mulders · Read in about 9 min · (1745 Words)
Posted in java / open source / dapr / architecture

It’s been a while since the first two posts about Dapr! In those first installments, we looked at the basics of Dapr, from a very conceptual point of view. We also looked at the bare minimum HTTP API that Dapr exposes to the applications that use it. But writing enterprise applications like would be slow, and it would inevitably lead to mistakes. In this article, I will introduce you to a higher abstraction level of working with Dapr.

To refresh your memory, in the first installment of the series, I explained the basics of Dapr. In the second post, I’ve shown you how to manually integrate an existing Jakarta EE application with Dapr. As you may recall, this requires following the REST API documentation closely, implementing the HTTP client calls and carefully processing the responses. Of course it is perfectly possible to get the integration working this way, but it’s cumbersome and error-prone.

The Dapr SDK for Java

That’s why Dapr comes with SDK’s for various platforms, among them Java. The Dapr SDK for Java provides a higher abstraction level than plain HTTP calls. Add it to your project by including the following snippet in pom.xml:

<dependency>
    <groupId>io.dapr</groupId>
    <artifactId>dapr-sdk</artifactId>
    <version>1.3.1</version>
</dependency>

Image you want to store some Plain Old Java Object (POJO) in the state store. Now, rather than building those HTTP requests manually, you can use the SDK to write code like this:

var vehicleState = new VehicleState(licenseNumber, entryTimestamp);
daprClient.saveState("vehicles", vehicleState.licenseNumber(), vehicleState);

Similarly, to retrieve a POJO from the same state store:

var vehicleState = daprClient.getState("vehicles", licenseNumber, VehicleState.class)

Both samples come from my Dapr for Java workshop.

The DaprClient class that we use here delegates to an underlying implementation. There are currently two implementations possible, which leverage either gRPC or JSON over HTTP to do the actual communication with the Dapr sidecar.

Reactive Programming

The above snippets are not complete. Since the Dapr SDK for Java builds upon Project Reactor, all methods are asynchronous. This means they return a Mono<T> or a Flux<T>, respectively a single item or a stream of items somewhere in the future. Since both a Mono<T> and a Flux<T> are “cold”, you have to “subscribe” on them. By subscribing, you tell the code that will produce the items that you are ready to process the items that they will produce. One of the ways to do this is by invoking .block() on the Mono<T> or Flux<T>. That’s really the bare minimum of it, there’s a lot more to say about Project Reactor but that’s outside the scope for this post.

In case you aren’t familiar with reactive programming, I highly recommend you read the Introduction to Reactive Programming.

Spring Boot integration

The Dapr SDK for Java also has an optional module that provides even tighter integration with Spring Boot. In most cases, you will not need it when your application uses Spring Boot. There are two situations where it can be of use:

  1. If your application uses the Actor model. More on that in a next installment of this series.
  2. If your application subscribes to a pub/sub topic. More on that below.

One of the Dapr building blocks we didn’t cover until now is the Publish & subscribe (or pub/sub for short) building block. This building block lets your applications communicate using asynchronous messages, rather than the traditional request/response approach. Many people will say: “but asynchronous communication makes things incredibly harder; why would you do that?”. One of the powers of using pub/sub is that the producing or publishing side does not need to know upfront who will be receiving the messages. There could be one receiving application, or many, or none - the publisher doesn’t know and doesn’t care. This gives an enormous flexibility, especially in event-driven and message-driven environments. And let’s be honest: many real-world environments are, in fact, also event- or message driven. As humans, we often react to what’s happening around is as the events unfold…

Pub/sub in practice

The idea of this asynchronous messaging revolves around “topics”. Topics are a kind of channel where others (consumers) can subscribe to. When a publisher produces a message, the consumers that are subscribed each receive a copy of that message.

Publishing a message with Dapr is straightforward using the SDK:

daprClient.publishEvent(
    "pubsub-component-name",
    "speeding-violations",
    violation // some String or POJO with the payload
);

Again, you probably want to .block() on the outcome, or otherwise make sure the Mono runs. This code assumes that you already have configured a Publish & Subscribe building block, using e.g. Kafka, RabbitMQ or some cloud vendor specific implementation.

Now over to the consuming side: Dapr requires your consuming application to express a “subscription”, the fact that it wants to receive messages on a particular topic. When your application starts up, the Dapr sidecar will ask your application if it it wants to register a subscription by invoking a GET /dapr/subscribe. Of course, you could hand-craft a response on that endpoint that satisfies the contract. But it’s easier to include the Spring Boot add-on of the Dapr SDK by adding this snippet to your pom.xml:

<dependency>
    <groupId>io.dapr</groupId>
    <artifactId>dapr-sdk-springboot</artifactId>
    <version>1.3.1</version>
</dependency>

Now you can annotate an existing Spring controller with @Topic:

@Topic(name = "speeding-violations", pubsubName = "pubsub-component-name")
@PostMapping(path = "/collectfine")
public ResponseEntity<Void> registerViolation(
    @RequestBody final CloudEvent<SpeedingViolation> event
) {
    var violation = event.getData();
    violationProcessor.processSpeedingViolation(violation);
    return ResponseEntity.ok().build();
}

Note how this method takes a parameter of the generic type CloudEvent, which contains a few bits of metadata, and also exposes the original payload using its getData() method.

Distributed Tracing

But that is not all there is. If you work with distributed architectures, you will likely need traceability one day or another. Traceability means that you can “track & trace” traffic as it goes through your architecture.

As an example, imagine a distributed architecture for a web shop. HTTP requests from end users come in at a Front API. The Front API will delegate part of the business logic to an Product Catalogue API to see which products the shop sells. It also needs the Shopping Cart API to see if the customer has anything in their shopping cart. To have up-to-date information on product availability, the Product Catalogue API also checks the Inventory API to fetch actual product availability.

UserUserFront APIFront APIProduct Catalogue APIProduct Catalogue APIShopping Cart APIShopping Cart APIInventory APIInventory APIRequestFetch productsloop[For all products]Fetch product availabilityProduct availabilityProducts withactual availabilityFetch Shopping CartContents of Shopping CartShopping Cart contents,Products and Availability
Example distributed architecture for a Web Shop

Even though this example is relatively simple, it can be challenging to find performance bottlenecks. Also, troubleshooting individual failures can be cumbersome. Distributed tracing addresses these issues by correlating requests and responses on each component with upstream and downstream requests.

As an example, if the Product Catalogue API would log an error, distributed tracing would make it possible to see for which particular request the error was logged. Not only that, it would also allow to see what happened for that same user request in the Inventory API and the Front API.

Instrumentation

Tools like OpenTelemtry and Spring Cloud Sleuth can make it easier to set this up. Without too much work, they will instrument incoming and outgoing traffic with some additional metadata using HTTP headers or message properties. The applications can then report to a central system (e.g. Zipkin) what they did, how long it took, etc. Zipkin can then deliver visualisations like these:

Example of a Zipkin trace visualisation
Example of a Zipkin trace visualisation

If you use Dapr, you certainly want to have invocations of Dapr building blocks to be part of these visualisations. This of course applies to the Service Invocation Building Block, but also to State Management, Secret Management or Input/Output Bindings.

The automatic instrumentation that Open Telemetry or Sleuth provide you with can’t always help you, as they instrument Spring beans. Because the Dapr SDK is unaware of the Spring context it may (or may not) live in, its underlying gRPC or HTTP clients may not get instrumented. On the other hand, Dapr doesn’t know whether you actually use distributed tracing, and if so, which tool you use for it.

Connecting the dots

Luckily, we can connect those two worlds. As I mentioned before, the methods on the DaprClient return a “cold” Mono<T> or a Flux<T>. Before we subscribe on them, we can enrich the Reactor Context that the producing code will also have access to through its contextWrite method.

  • The Reactor Context looks a bit like java.util.Map, providing key/value mappings.
  • The contextWrite method takes one parameter of type java.util.function.Function, where both the only argument and the return type are reactor.util.context.Context.

In other words, contextWrite accepts a method that takes the existing Reactor Context, modifies it and return it. Reactor will invoke that function to enhance the Reactor Context, and then run the producing code (the code inside the Dapr SDK). The Dapr SDK will inspect the Reactor Context to see if there’s any tracing metadata present.

Enriching the Reactor Context with tracing metadata is dependent on the tracing tool that your application uses. Let me show you how to integrate the Dapr SDK with OpenTelemetry. Here’s an example of how you could read the tracing context from OpenTelemtry, and add it to the Reactor Context.

final reactor.util.context.Context context = ... // method argument of contextWrite(...)

final OpenTelemetry openTelemetry = ...          // Injected by Spring, or constructed manually

// Extract OpenTelemetry context into java.util.Map
final Map<String, String> map = new HashMap<>();
openTelemetry.getPropagators().getTextMapPropagator()
        .inject(io.opentelemetry.context.Context.current(), map, (carrier, key, value) -> {
            map.put(key, value);
        });

// Copy OpenTelemtry context back into Reactor Context
return context.putAll(Context.of(map).readOnly());

There may be a more efficient approach, but this code communicates its intent quite well.

Wrapping up

That’s it for this episode! We have seen how the Dapr SDK for Java makes it easier to work with Dapr for Java developers. We also had a glance at how to do asynchronous messaging using Daprs Publish & subscribe building block. And finally, we have seen how the SDK provides extension points for more advanced use cases like distributed tracing.

The the last episode (for now) will dive into another advanced topic: writing Actor-based applications using Dapr.

And remember, if you have any questions, ideas or suggestions for an additional post: let me hear from you! Feel free to reach out using the comments box below or any of the platforms in the sidebar.

Comments