Heron processing topologies can be written using an API called the Heron ECO API. The ECO API is currently available to work with spouts and bolts from the following packages:

  • org.apache.storm

We will expand compatibility based on feedback we get from the community.

Although this document focuses on the ECO API, both the Streamlet API and Topology API topologies you have built can still be used with Heron

The Heron ECO API vs. The Streamlet and Topology APIs

Heron's ECO offers one major difference over the Streamlet and Topology APIs and that is extensibility without recompilation. With Heron's ECO developers now have a way to alter the way data flows through spouts and bolts without needing to get into their code and make changes. Topologies can now be defined through a YAML based format.

Why the name ECO?

/ˈekoʊ/ (Because all software should come with a pronunciation guide these days) ECO is an acronym that stands for:

  • Extensible
  • Component
  • Orchestrator

What about Storm Flux? Is it compatible with Eco?

ECO is an extension Flux. Most Storm Flux topologies should be able to deployed in Heron with minimal changes. Start reading Migrate Storm Topologies To Heron to learn how to migrate your Storm Flux topology then come back.

Getting started

In order to use the Heron ECO API for Java, you'll need to install the heron-api and the heron-storm library, which is available via Maven Central.

Maven setup

To install the heron-api library using Maven, add this to the dependencies block of your pom.xml configuration file:

<dependency>
    <groupId>com.twitter.heron</groupId>
    <artifactId>heron-api</artifactId>
    <version>0.17.5</version>
    <scope>compile</scope>
</dependency>
<dependency>
    <groupId>com.twitter.heron</groupId>
    <artifactId>heron-storm</artifactId>
    <version>0.17.5</version>
    <scope>compile</scope>
</dependency>

Compiling a JAR with dependencies

In order to run a Java topology in a Heron cluster, you'll need to package your topology as a "fat" JAR with dependencies included. You can use the Maven Assembly Plugin to generate JARs with dependencies. To install the plugin and add a Maven goal for a single JAR, add this to the plugins block in your pom.xml:

<plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
        <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
        <archive>
            <manifest>
                <mainClass></mainClass>
            </manifest>
        </archive>
    </configuration>
    <executions>
        <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
                <goal>single</goal>
            </goals>
        </execution>
    </executions>
</plugin>

Once your pom.xml is properly set up, you can compile the JAR with dependencies using this command:

$ mvn assembly:assembly

By default, this will add a JAR in your project's target folder with the name PROJECT-NAME-VERSION-jar-with-dependencies.jar. Here's an example ECO topology submission command using a compiled JAR:

$ heron submit local \
  target/my-project-1.2.3-jar-with-dependencies.jar \
  com.twitter.heron.eco.Eco \
  --eco-config-file path/to/your/topology-definition.yaml

Topology Name

Configuration

Components

Property Injection

The Topology Definition

Streams and Groupings

Handling Enums

Property Substitution

Environment Variable Substitution

Other ECO Examples

Notice how the above example submission command is referencing the main class com.twitter.heron.eco.Eco. This part of the command needs to stay the same. Eco is the main class that will assemble your topology from the --eco-config-file you specify.

Defining Your ECO Topology File

An ECO topology definition consists of the following:

  • A topology name
  • An optional list of topology "components" (named Java objects that will be made available for configuration in the topology)
  • A DSL topology definition that contains:
    • A list of spouts, each identified by a unique ID
    • A list of bolts, each identified by a unique ID
    • A list of "stream" objects representing a flow of tuples between spouts and bolts

An example of a simple YAML DSL definition is below:

name: "fibonacci-topology"

config:
  topology.workers: 1

components:
  - id: "property-holder"
    className: "com.twitter.heron.examples.eco.TestPropertyHolder"
    constructorArgs:
      - "some argument"
    properties:
      - name: "numberProperty"
        value: 11
      - name: "publicProperty"
        value: "This is public property"

spouts:
  - id: "spout-1"
    className: "com.twitter.heron.examples.eco.TestFibonacciSpout"
    constructorArgs:
      - ref: "property-holder"
    parallelism: 1

bolts:
  - id: "even-and-odd-bolt"
    className: "com.twitter.heron.examples.eco.EvenAndOddBolt"
    parallelism: 1

  - id: "ibasic-print-bolt"
    className: "com.twitter.heron.examples.eco.TestIBasicPrintBolt"
    parallelism: 1
    configMethods:
      - name: "sampleConfigurationMethod"
        args:
          - "${ecoPropertyOne}"
          - MB

  - id: "sys-out-bolt"
    className: "com.twitter.heron.examples.eco.TestPrintBolt"
    parallelism: 1

streams:
  - from: "spout-1"
    to: "even-and-odd-bolt"
    grouping:
      type: SHUFFLE

  - from: "even-and-odd-bolt"
    to: "ibasic-print-bolt"
    grouping:
      type: SHUFFLE
      streamId: "odds"

  - from: "even-and-odd-bolt"
    to: "sys-out-bolt"
    grouping:
      type: SHUFFLE
      streamId: "evens"

If you want to stop here and try to deploy the above topology you can execute:

$ heron submit local \
  ~/.heron/examples/heron-eco-examples.jar \
  com.twitter.heron.eco.Eco \
  --eco-config-file ~/.heron/examples/fibonacci.yaml

This ECO topology does not do anything spectacular, but it's a good starting point to go through some of ECO's concepts.

Taking a closer look at the YAML definition specs

Topology Name

Each ECO definition file will be required to have a name defined.

name: "simple-wordcount-topology"

Configuration

config is the section where you will list your properties to be inserted into a com.twitter.heron.api.Config class. This section is optional.

config:
  topology.workers: 1

Specifying Component Level Resources

You can specify component level JVM resources by referencing the id of the component and its ram. You can choose between Bytes B, Megabytes MB, or Gigabytes GB. Examples would be 256MB or 2GB. The unit of measurement must be appended at the end of the numerical value with no spaces. There is plan to support component level cpu and disk configs in the future.

 topology.component.resourcemap:

    - id: "spout-1"
      ram: 256MB # The minimum value for a component's specified ram is 256MB
      

    - id: "bolt-1"
      ram: 256MB # The minimum value for a component's specified ram is 256MB
      

Specifying JVM Options

You can specify component level JVM resources by referencing the id of the component and a list of the JVM options

topology.component.jvmoptions:

   - id: "spout-1"
     options: ["-XX:NewSize=300m", "-Xms2g"]

Other Supported Configuration Parameters

  • "topology.worker.childopts" : Topology-specific options for the worker child process. This is used in addition to WORKER_CHILDOPTS
  • "topology.tick.tuple.freq.ms" : How often (in milliseconds) a tick tuple from the "system" component and "tick" stream should be sent to tasks. Meant to be used as a component-specific configuration.
  • "topology.enable.message.timeouts" : True if Heron should timeout messages or not. Defaults to true. This is meant to be used in unit tests to prevent tuples from being accidentally timed out during the test.
  • "topology.debug" : When set to true, Heron will log every message that's emitted.
  • "topology.stmgrs" : The number of stmgr instances that should spin up to service this topology. All the executors will be evenly shared by these stmgrs.
  • "topology.message.timeout.secs" : The maximum amount of time given to the topology to fully process a message emitted by a spout. If the message is not acked within this time frame, Heron will fail the message on the spout. Some spouts implementations will then replay the message at a later time.
  • "topology.component.parallelism" : The per component parallelism for a component in this topology.
  • "topology.max.spout.pending" : This config applies to individual tasks, not to spouts or topologies as a whole. A pending tuple is one that has been emitted from a spout but has not been acked or failed yet. Note that this config parameter has no effect for unreliable spouts that don't tag their tuples with a message id.
  • "topology.auto.task.hooks" : A list of task hooks that are automatically added to every spout and bolt in the topology. An example of when you'd do this is to add a hook that integrates with your internal monitoring system. These hooks are instantiated using the zero-arg constructor.
  • "topology.serializer.classname" : The serialization class that is used to serialize/deserialize tuples
  • "topology.reliability.mode" : A Heron topology can be run in any one of the TopologyReliabilityMode mode. The format of this flag is the string encoded values of the underlying TopologyReliabilityMode value. Values are ATMOST_ONCE, ATLEAST_ONCE, and EFFECTIVELY_ONCE.
  • "topology.reliability.mode" : A Heron topology can be run in any one of the TopologyReliabilityMode mode. The format of this flag is the string encoded values of the underlying TopologyReliabilityMode value.
  • "topology.container.cpu" : Number of cpu cores per container to be reserved for this topology.
  • "topology.container.ram" : Amount of ram per container to be reserved for this topology. In bytes.
  • "topology.container.disk" : Amount of disk per container to be reserved for this topology. In bytes.
  • "topology.container.max.cpu.hint" : Hint for max number of cpu cores per container to be reserved for this topology.
  • "topology.container.max.ram.hint" : Hint for max amount of ram per container to be reserved for this topology. In bytes.
  • "topology.container.max.disk.hint" : Hint for max amount of disk per container to be reserved for this topology. In bytes.
  • "topology.container.padding.percentage" : Hint for max amount of disk per container to be reserved for this topology. In bytes.
  • "topology.container.ram.padding" : Amount of ram to pad each container. In bytes.
  • "topology.stateful.checkpoint.interval.seconds" : What's the checkpoint interval for stateful topologies in seconds.
  • "topology.stateful.start.clean" : Boolean flag that says that the stateful topology should start from clean state, i.e. ignore any checkpoint state.
  • "topology.name" : Name of the topology. This config is automatically set by Heron when the topology is submitted.
  • "topology.team.name" : Name of the team which owns this topology.
  • "topology.team.email" : Email of the team which owns this topology.
  • "topology.cap.ticket" : Cap ticket (if filed) for the topology. If the topology is in prod this has to be set or it cannot be deployed.
  • "topology.project.name" : Project name of the topology, to help us with tagging which topologies are part of which project. For example, if topology A and Topology B are part of the same project, we will like to aggregate them as part of the same project. This is required by Cap team.
  • "topology.additional.classpath" : Any user defined classpath that needs to be passed to instances should be set in to config through this key. The value will be of the format "cp1:cp2:cp3..."
  • "topology.update.deactivate.wait.secs" : Amount of time to wait after deactivating a topology before updating it
  • "topology.update.reactivate.wait.secs" : fter updating a topology, amount of time to wait for it to come back up before reactivating it
  • "topology.environment" : Topology-specific environment properties to be added to an Heron instance. This is added to the existing environment (that of the Heron instance). This variable contains Map
  • "topology.timer.events" : Timer events registered for a topology. This is a Map>. Where the key is the name and the value contains the frequency of the event and the task to run.
  • "topology.remote.debugging.enable" : Enable Remote debugging for java heron instances
  • "topology.droptuples.upon.backpressure" : Do we want to drop tuples instead of initiating Spout BackPressure
  • "topology.component.output.bps" : The per component output bytes per second in this topology

Components

components are a list of instances that would be used as configuration objects for other components in your ECO file defined in the YAML DSL. The properties that are required for each component instance are id and className. id can be any name you choose, className is the fully qualified className of the Java class. The id field is used to identify the component for injection in the coming spouts and bolts defined in the topology. constructorArgs is only needed if a component has constructor that requires arguments. We will get into constructor args the in the next section. components are optional.

components:
  - id: "property-holder"
    className: "com.twitter.heron.examples.eco.TestPropertyHolder"
    constructorArgs:
      - "some argument"

Property Injection

Constructor Injection

constructorArgs can specify any object type. Above you can see that the only constructor argument specified is a string that contained "some argument". If declaring a number, you may omit the parenthesis.

 constructorArgs:
      - "some argument"
      - 123.45

The is also a way to reference other components as arguments by using ref. In the example below we are specifying an already defined component to be a constructor argument. Any instance that is referenced by ref must have already been defined in the ECO definition file before it is to be used.

constructorArgs:
      - ref: "property-holder"

Setter and Public Field Injection

Besides constructor injection, you may also use setter methods. In the below example, ECO will take inspect the component for setters that match the names and values provided. If no setter is defined, it will then look for a public field to set the property.

properties:
      - name: "numberProperty"
        value: 11
      - name: "publicProperty"
        value: "This is public property"

The Topology Definition

Spouts and Bolts each have their own sections for defining in the ECO file. They are extensions of components so they will be allowed the same property injection methods above. One difference is the parallelism property they contain, it sets the parallelism property for each bolt or spout once the topology has been deployed into Heron.

Spouts

spouts:
  - id: "spout-1"
    className: "com.twitter.heron.examples.eco.TestFibonacciSpout"
    constructorArgs:
      - ref: "property-holder"
    parallelism: 1

Bolts

bolts:
  - id: "even-and-odd-bolt"
    className: "com.twitter.heron.examples.eco.EvenAndOddBolt"
    parallelism: 1

Streams and Groupings

Streams are what connect your bolts and spouts together. Stream Groupings are the specific way you are to connect those streams to the spouts and bolts.

A Stream can contain the following fields.
  • from - references the id of the component where data is coming from
  • to - references the id of the component where the data is going
  • grouping - This is grouping definition of how this stream will connect the two afore mentioned components together
A grouping can contain the following fields
  • type - The type of grouping.
    • SHUFFLE
    • FIELDS
    • ALL
    • GLOBAL
    • NONE
    • CUSTOM
  • args is specific to the FIELDS grouping type. You would specify this as a list like so ["arg1", "arg2"]
  • customClass if you wanted to create a custom grouping, you could specify the fully qualified class name here

In the below example, you can see the first Stream Definition declares that data will flow from spout-1 to even-and-odd-bolt with a grouping type of SHUFFLE

streams:
  - from: "spout-1"
    to: "even-and-odd-bolt"
    grouping:
      type: SHUFFLE

  - from: "even-and-odd-bolt"
    to: "ibasic-print-bolt"
    grouping:
      type: SHUFFLE
      streamId: "odds"

  - from: "even-and-odd-bolt"
    to: "sys-out-bolt"
    grouping:
      type: SHUFFLE
      streamId: "evens"

Handling Enums

The usage of Enums is supported in ECO. You can use enums in constructor arguments, references, configuration methods, and properties In the fibonacci-topology referenced above. In the examples we reference the enum TestUnits. The enum is shown below.

public enum TestUnits {
  MB("MB"),
  GB("GB"),
  B("B");

  String value;

  TestUnits(String value) {
    this.value = value;
  }

  public String getValue() {
    return value;
  }
}

In the ibasic-print-bolt a configMethod is specified with the name sampleConfigurationMethod.

  - id: "ibasic-print-bolt"
    ... excluded for simplicity
        args:
          - "someStringArgument"
          - MB

This is the same as calling the following java method

public void sampleConfigurationMethod(String someProperty, TestUnits TestUnits) {
    this.someProperty += someProperty;
    this.TestUnits = TestUnits;
  }
  

Property Substitution

It's always nice to be able to define properties based on the environments you are in. We haven't forgotten this with ECO. You are able to substitute values into your ECO files by either environment variables or a properties file.
To turn on property substitution add the flag --props and specify a path to a .properties file.

To start over run

$ heron kill local fibonacci-topology

After the topology has been killed, execute:

$ heron submit local \
  ~/.heron/examples/heron-eco-examples.jar \
  com.twitter.heron.eco.Eco \
  --eco-config-file ~/.heron/examples/fibonacci.yaml --props ~/.heron/examples/sample.properties

If you look above at the yaml file snippet at the beginning of the page you will see

- id: "ibasic-print-bolt"
    className: "com.twitter.heron.examples.eco.TestIBasicPrintBolt"
    parallelism: 1
    configMethods:
      - name: "sampleConfigurationMethod"
        args:
          - "${ecoPropertyOne}"
          - MB

In the sample.properties we have a key value set at ecoPropertyOne=thisValueWasSetFromAPropertiesFile. You can check the logs for the ibasic-print-bolt and see the values are printing out.

Environment Variable Substitution

ECO also allows you to do environment variable substitution. To activate environment variable substitution pass the flag --env-props upon submitting a topology. If you have SOME_VARIABLE defined you can reference in your yaml file like below.

${ENV-SOME_VARIABLE}

Other ECO examples

Run the simple wordcount example

$ heron submit local \
  ~/.heron/examples/heron-eco-examples.jar \
   com.twitter.heron.eco.Eco \
   --eco-config-file ~/.heron/examples/simple_wordcount.yaml 

Run the simple windowing example

$ heron submit local \
   ~/.heron/examples/heron-eco-examples.jar \
   com.twitter.heron.eco.Eco \
   --eco-config-file ~/.heron/examples/simple_windowing.yaml