New in version 3.0.0.
Topologies¶
Storm topologies are described as a Directed Acyclic Graph (DAG) of Storm components, namely bolts and spouts.
Topology DSL¶
To simplify the process of creating Storm topologies, streamparse features a Python Topology DSL. It lets you specify topologies as complex as those you can in Java or Clojure, but in concise, readable Python.
Topology files are located in topologies
in your streamparse project folder.
There can be any number of topology files for your project in this directory.
topologies/my_topology.py
topologies/my_other_topology.py
topologies/my_third_topology.py
- …
A valid Topology
may only have Bolt
and Spout
attributes.
Simple Python Example¶
The first step to putting together a topology, is creating the bolts and spouts, so let’s assume we have the following bolt and spout:
from collections import Counter
from redis import StrictRedis
from streamparse import Bolt
class WordCountBolt(Bolt):
outputs = ['word', 'count']
def initialize(self, conf, ctx):
self.counter = Counter()
self.total = 0
def _increment(self, word, inc_by):
self.counter[word] += inc_by
self.total += inc_by
def process(self, tup):
word = tup.values[0]
self._increment(word, 10 if word == "dog" else 1)
if self.total % 1000 == 0:
self.logger.info("counted %i words", self.total)
self.emit([word, self.counter[word]])
class RedisWordCountBolt(Bolt):
def initialize(self, conf, ctx):
from itertools import cycle
from streamparse import Spout
class WordSpout(Spout):
outputs = ['word']
def initialize(self, stormconf, context):
self.words = cycle(['dog', 'cat', 'zebra', 'elephant'])
def next_tuple(self):
word = next(self.words)
self.emit([word])
One important thing to note is that we have added an outputs
attribute to
these classes, which specify the names of the output fields that will be
produced on their default
streams. If we wanted to specify multiple
streams, we could do that by specifying a list of Stream
objects.
Now let’s hook up the bolt to read from the spout:
"""
Word count topology (in memory)
"""
from streamparse import Grouping, Topology
from bolts import WordCountBolt
from spouts import WordSpout
class WordCount(Topology):
word_spout = WordSpout.spec()
count_bolt = WordCountBolt.spec(inputs={word_spout: Grouping.fields('word')},
par=2)
Note
Your project’s src
directory gets added to sys.path
before your
topology is imported, so you should use absolute imports based on that.
As you can see, streamparse.Bolt.spec()
and
streamparse.Spout.spec()
methods allow us to specify information about
the components in your topology and how they connect to each other. Their
respective docstrings outline all of the possible ways they can be used.
Java Components¶
The topology DSL fully supports JVM-based bolts and spouts via the
JavaBolt
and JavaSpout
classes.
Here’s an example of how we would use the Storm Kafka Spout:
"""
Pixel count topology
"""
from streamparse import Grouping, JavaSpout, Topology
from bolts.pixel_count import PixelCounterBolt
from bolts.pixel_deserializer import PixelDeserializerBolt
class PixelCount(Topology):
pixel_spout = JavaSpout.spec(name="pixel-spout",
full_class_name="pixelcount.spouts.PixelSpout",
args_list=[],
outputs=["pixel"])
pixel_deserializer = PixelDeserializerBolt.spec(name='pixel-deserializer-bolt',
inputs=[pixel_spout])
pixel_counter = PixelCounterBolt.spec(name='pixel-count-bolt',
inputs={pixel_deserializer:
Grouping.fields('url')},
config={"topology.tick.tuple.freq.secs": 1})
One limitation of the Thrift interface we use to send the topology to Storm is that the constructors for Java components can only be passed basic Python data types: bool, bytes, float, int, and str.
Note
If you are passing strings as constructor arguments to Java components via
the args_list
parameter, you must use unicode literals to do so in
Python 2. Otherwise, Storm will raise an exception.
Components in Other Languages¶
If you have components that are written in languages other than Java or Python, you can have those as part of your topology as well—assuming you’re using the corresponding multi-lang library for that language.
To do that you just need to use the streamparse.ShellBolt.spec()
and
streamparse.ShellSpout.spec()
methods. They take command
and
script
arguments to specify a binary to run and its string-separated
arguments.
Multiple Streams¶
To specify that a component has multiple output streams, instead of using a
list of strings for outputs
,
you must specify a list of Stream
objects, as shown below.
class FancySpout(Spout):
outputs = [Stream(fields=['good_data'], name='default'),
Stream(fields=['bad_data'], name='errors')]
To select one of those streams as the input for a downstream
Bolt
, you simply use []
to specify the stream you
want. Without any stream specified, the default
stream will be used.
class ExampleTopology(Topology):
fancy_spout = FancySpout.spec()
error_bolt = ErrorBolt.spec(inputs=[fancy_spout['errors']])
process_bolt = ProcessBolt.spec(inputs=[fancy_spout])
Groupings¶
By default, Storm uses a SHUFFLE
grouping to route
tuples to particular executors for a given component, but you can also specify
other groupings by using the appropriate Grouping
attribute. The most common grouping is probably the
fields()
grouping, which will send all the tuples
with the same value for the specified fields to the same executor. This can be
seen in the prototypical word count topology:
"""
Word count topology (in memory)
"""
from streamparse import Grouping, Topology
from bolts import WordCountBolt
from spouts import WordSpout
class WordCount(Topology):
word_spout = WordSpout.spec()
count_bolt = WordCountBolt.spec(inputs={word_spout: Grouping.fields('word')},
par=2)
Topology Cycles¶
On rare occassions, you may want to create a cyclical topology. This may not seem easily done with the current topology DSL, but there is a workaround you can use: manually declaring a temporary lower-level :class:~streamparse.thrift.GlobalStreamId that you can refer to in multiple places.
The following code creates a Topology
with a cycle
between its two Bolts.
from streamparse.thrift import GlobalStreamId
# Create a reference to B's output stream before we even declare Topology
b_stream = GlobalStreamId(componentId='b_bolt', streamId='default')
class CyclicalTopology(Topology):
some_spout = SomeSpout.spec()
# Include our saved stream in your list of inputs for A
a_bolt = A.spec(name="A", inputs=[some_spout, b_stream])
# Have B get input from A like normal
b_bolt = B.spec(name="B", inputs=[a_bolt])
Topology-Level Configuration¶
If you want to set a config option for all components in your topology, like
topology.environment
, you can do that by adding a config
class attribute
to your Topology
that is a dict mapping from option
names to their values. For example:
class WordCount(Topology):
config = {'topology.environment': {'LD_LIBRARY_PATH': '/usr/local/lib/'}}
...
Running Topologies¶
What Streamparse Does¶
When you run a topology either locally or by submitting to a cluster, streamparse will
- Bundle all of your code into a JAR
- Build a Thrift Topology struct out of your Python topology definition.
- Pass the Thrift Topology struct to Nimbus on your Storm cluster.
If you invoked streamparse with sparse run
, your code is executed directly
from the src/
directory.
If you submitted to a cluster with sparse submit
, streamparse uses lein
to compile the src
directory into a jar file, which is run on the
cluster. Lein uses the project.clj
file located in the root of your
project. This file is a standard lein project file and can be customized
according to your needs.
Dealing With Errors¶
When detecting an error, bolt code can call its fail()
method in order to have Storm call the respective spout’s
fail()
method. Known error/failure cases result in
explicit callbacks to the spout using this approach.
Exceptions which propagate without being caught will cause the component to
crash. On sparse run
, the entire topology will stop execution. On a running
cluster (i.e. sparse submit
), Storm will auto-restart the crashed component
and the spout will receive a fail()
call.
If the spout’s fail handling logic is to hold back the tuple and not re-emit it, then things will keep going. If it re-emits it, then it may crash that component again. Whether the topology is tolerant of the failure depends on how you implement failure handling in your spout.
Common approaches are to:
- Append errant tuples to some sort of error log or queue for manual inspection later, while letting processing continue otherwise.
- Attempt 1 or 2 retries before considering the tuple a failure, if the error was likely an transient problem.
- Ignore the failed tuple, if appropriate to the application.
Parallelism and Workers¶
In general, use the ``par`` “parallelism hint” parameter per spout and bolt in your configuration to control the number of Python processes per component.
Reference: Understanding the Parallelism of a Storm Topology
Storm parallelism entities:
- A worker process is a JVM, i.e. a Java process.
- An executor is a thread that is spawned by a worker process.
- A task performs the actual data processing. (To simplify, you can think of it as a Python callable.)
Spout and bolt specs take a par
keyword to provide a parallelism hint to
Storm for the number of executors (threads) to use for the given spout/bolt;
for example, par=2
is a hint to use two executors. Because streamparse
implements spouts and bolts as independent Python processes, setting par=N
results in N Python processes for the given spout/bolt.
Many streamparse applications will need only to set this parallelism hint to control the number of resulting Python processes when tuning streamparse configuration. For the underlying topology workers, streamparse sets a default of 2 workers, which are independent JVM processes for Storm. This allows a topology to continue running when one worker process dies; the other is around until the dead process restarts.
Both sparse run
and sparse submit
accept a -p N
command-line flag
to set the number of topology workers to N. For convenience, this flag also
sets the number of Storm’s underlying messaging reliability
acker bolts to the same N value. In the event that you need it (and you
understand Storm ackers), use the -a
and -w
command-line flags instead
of -p
to control the number of acker bolts and the number of workers,
respectively. The sparse
command does not support Storm’s rebalancing
features; use sparse submit -f -p N
to kill the running topology and
redeploy it with N workers.
Note that the underlying Storm thread implementation, LMAX Disruptor, is designed with high-performance inter-thread messaging as a goal. Rule out Python-level issues when tuning your topology:
- bottlenecks where the number of spout and bolt processes are out of balance
- serialization/deserialization overhead of more data emitted than you need
- slow routines/callables in your code