Discussion:
WorkerHook deserialization problem
(too old to reply)
Kevin Peek
2016-10-21 13:58:08 UTC
Permalink
I am running into problems with WorkerHooks on a local cluster. Even using
only a BaseWorkerHook, I get an Exception. When I run the following code,
an EOFException is thrown - it seems the Worker is trying to deserialize an
empty byte[] for one of the WorkerHooks. Comment out the line adding the
hook and this runs fine.

Can someone help me understand what is going wrong here and whether or not
this is strictly an issue with the LocalCluster and how I am using it.


TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spoutId", new RandomNumberSpout());
builder.addWorkerHook(new BaseWorkerHook());
StormTopology topology = builder.createTopology();
Config config = new Config();
config.setMessageTimeoutSecs(1);
String topologyName = "dummy-topology";

LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topologyName, config, topology);
Thread.sleep(5000);
cluster.killTopology(topologyName);
Thread.sleep(10000);
cluster.shutdown();


Produces:


java.lang.RuntimeException: java.io.EOFException

at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:185)
at
org.apache.storm.daemon.worker$run_worker_shutdown_hooks$iter__8540__8544$fn__8545.invoke(worker.clj:576)
at clojure.lang.LazySeq.sval(LazySeq.java:40)
at clojure.lang.LazySeq.seq(LazySeq.java:49)
at clojure.lang.RT.seq(RT.java:507)
at clojure.core$seq__4128.invoke(core.clj:137)
at clojure.core$dorun.invoke(core.clj:3009)
at clojure.core$doall.invoke(core.clj:3025)
at
org.apache.storm.daemon.worker$run_worker_shutdown_hooks.invoke(worker.clj:574)
at
org.apache.storm.daemon.worker$fn__8555$exec_fn__2466__auto__$reify__8557$shutdown_STAR___8577.invoke(worker.clj:691)
at
org.apache.storm.daemon.worker$fn__8555$exec_fn__2466__auto__$reify$reify__8603.shutdown(worker.clj:704)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
at
org.apache.storm.process_simulator$kill_process.invoke(process_simulator.clj:46)
at
org.apache.storm.daemon.supervisor$shutdown_worker.invoke(supervisor.clj:286)
at
org.apache.storm.daemon.supervisor$fn__9307$exec_fn__2466__auto__$reify__9332.shutdown_all_workers(supervisor.clj:852)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
at org.apache.storm.testing$kill_local_storm_cluster.invoke(testing.clj:199)
at org.apache.storm.LocalCluster$_shutdown.invoke(LocalCluster.clj:66)
at org.apache.storm.LocalCluster.shutdown(Unknown Source)
P. Taylor Goetz
2016-10-28 20:15:05 UTC
Permalink
I was able to verify this to be a bug in how worker hooks work in local mode.

In trying to see if this affects distributed mode as well, a found a more serious issue that prevents workers from shutting down gracefully (an thus preventing shutdown hooks from running):

https://issues.apache.org/jira/browse/STORM-2176 <https://issues.apache.org/jira/browse/STORM-2176>

So for the time being I don’t believe worker shutdown hooks work in either local or distributed mode. I can confirm the start portion of worker hooks functions properly, but not shutdown. Hopefully we will be able to fix both these issues in an upcoming release.

-Taylor
I am running into problems with WorkerHooks on a local cluster. Even using only a BaseWorkerHook, I get an Exception. When I run the following code, an EOFException is thrown - it seems the Worker is trying to deserialize an empty byte[] for one of the WorkerHooks. Comment out the line adding the hook and this runs fine.
Can someone help me understand what is going wrong here and whether or not this is strictly an issue with the LocalCluster and how I am using it.
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spoutId", new RandomNumberSpout());
builder.addWorkerHook(new BaseWorkerHook());
StormTopology topology = builder.createTopology();
Config config = new Config();
config.setMessageTimeoutSecs(1);
String topologyName = "dummy-topology";
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topologyName, config, topology);
Thread.sleep(5000);
cluster.killTopology(topologyName);
Thread.sleep(10000);
cluster.shutdown();
java.lang.RuntimeException: java.io.EOFException
at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:185)
at org.apache.storm.daemon.worker$run_worker_shutdown_hooks$iter__8540__8544$fn__8545.invoke(worker.clj:576)
at clojure.lang.LazySeq.sval(LazySeq.java:40)
at clojure.lang.LazySeq.seq(LazySeq.java:49)
at clojure.lang.RT.seq(RT.java:507)
at clojure.core$seq__4128.invoke(core.clj:137)
at clojure.core$dorun.invoke(core.clj:3009)
at clojure.core$doall.invoke(core.clj:3025)
at org.apache.storm.daemon.worker$run_worker_shutdown_hooks.invoke(worker.clj:574)
at org.apache.storm.daemon.worker$fn__8555$exec_fn__2466__auto__$reify__8557$shutdown_STAR___8577.invoke(worker.clj:691)
at org.apache.storm.daemon.worker$fn__8555$exec_fn__2466__auto__$reify$reify__8603.shutdown(worker.clj:704)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
at org.apache.storm.process_simulator$kill_process.invoke(process_simulator.clj:46)
at org.apache.storm.daemon.supervisor$shutdown_worker.invoke(supervisor.clj:286)
at org.apache.storm.daemon.supervisor$fn__9307$exec_fn__2466__auto__$reify__9332.shutdown_all_workers(supervisor.clj:852)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
at org.apache.storm.testing$kill_local_storm_cluster.invoke(testing.clj:199)
at org.apache.storm.LocalCluster$_shutdown.invoke(LocalCluster.clj:66)
at org.apache.storm.LocalCluster.shutdown(Unknown Source)
Kevin Peek
2016-11-01 13:49:24 UTC
Permalink
thank you for the detailed response.
Post by P. Taylor Goetz
I was able to verify this to be a bug in how worker hooks work in local mode.
In trying to see if this affects distributed mode as well, a found a more
serious issue that prevents workers from shutting down gracefully (an thus
https://issues.apache.org/jira/browse/STORM-2176
So for the time being I don’t believe worker shutdown hooks work in either
local or distributed mode. I can confirm the start portion of worker hooks
functions properly, but not shutdown. Hopefully we will be able to fix both
these issues in an upcoming release.
-Taylor
I am running into problems with WorkerHooks on a local cluster. Even using
only a BaseWorkerHook, I get an Exception. When I run the following code,
an EOFException is thrown - it seems the Worker is trying to deserialize an
empty byte[] for one of the WorkerHooks. Comment out the line adding the
hook and this runs fine.
Can someone help me understand what is going wrong here and whether or not
this is strictly an issue with the LocalCluster and how I am using it.
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spoutId", new RandomNumberSpout());
builder.addWorkerHook(new BaseWorkerHook());
StormTopology topology = builder.createTopology();
Config config = new Config();
config.setMessageTimeoutSecs(1);
String topologyName = "dummy-topology";
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topologyName, config, topology);
Thread.sleep(5000);
cluster.killTopology(topologyName);
Thread.sleep(10000);
cluster.shutdown();
java.lang.RuntimeException: java.io.EOFException
at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:185)
at org.apache.storm.daemon.worker$run_worker_shutdown_
hooks$iter__8540__8544$fn__8545.invoke(worker.clj:576)
at clojure.lang.LazySeq.sval(LazySeq.java:40)
at clojure.lang.LazySeq.seq(LazySeq.java:49)
at clojure.lang.RT.seq(RT.java:507)
at clojure.core$seq__4128.invoke(core.clj:137)
at clojure.core$dorun.invoke(core.clj:3009)
at clojure.core$doall.invoke(core.clj:3025)
at org.apache.storm.daemon.worker$run_worker_shutdown_
hooks.invoke(worker.clj:574)
at org.apache.storm.daemon.worker$fn__8555$exec_fn__2466_
_auto__$reify__8557$shutdown_STAR___8577.invoke(worker.clj:691)
at org.apache.storm.daemon.worker$fn__8555$exec_fn__2466_
_auto__$reify$reify__8603.shutdown(worker.clj:704)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(
NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
at org.apache.storm.process_simulator$kill_process.invoke(
process_simulator.clj:46)
at org.apache.storm.daemon.supervisor$shutdown_worker.
invoke(supervisor.clj:286)
at org.apache.storm.daemon.supervisor$fn__9307$exec_fn__
2466__auto__$reify__9332.shutdown_all_workers(supervisor.clj:852)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(
NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
at org.apache.storm.testing$kill_local_storm_cluster.invoke(
testing.clj:199)
at org.apache.storm.LocalCluster$_shutdown.invoke(LocalCluster.clj:66)
at org.apache.storm.LocalCluster.shutdown(Unknown Source)
Loading...