Data pipelines using Dagster — Key Challenges I Encountered
Dagster is a decent orchestrator. Nice web interface. Data asset perspective is a good one. However there are areas that hurts while implementing a production grade solution with dagster. Here I am trying to capture those pain points that I faced.
Though dagster has been around for many years, github stars from community has not seen huge growth for the promised potential. I strongly believe, it is because of the highly opinionated implementation of dagster. It is easy to start using dagster. However as the problem gets complicated, we tend to spend more time on thinking how to solve an already solved problem but within dagster, spending more time on internals of dagster. Ideally I would like to spend more time on solving the actual problem than spending time on dagster’s internals.
LLM-assisted coding or Vibe coding
If you are good with LLM assisted coding or Vibe coding., I found it too hard to work with dagster. I tried LLMs like Claude, Gemini, Github Copilot. They generate common possible answers which are, ideally, how a good code would look like. But they are invalid responses in real life dagster flow. LLMs are not trained well with dagster mainly because the amount of in-depth working examples are less to train them. In comparison, Airflow gets you much better results pair programming with LLMs. Adding fuel to the fire, dagster keeps changing many things between minor releases. So latest version would have a better way of achieving something. However LLMs won’t have any knowledge of them.
The response generated by LLMs, in data pipeline use cases, feels more natural & how a programmer would normally think. But developers of dagster will have weird method names & parameters in non standard way of thinking. The API keeps changing. So LLM might produce a response for an older dagster version which may no longer be available in newer version.
Documentation
For fresh starters, dagster has done a great job on documentation. However, when we work on enterprise grade problems, there are many areas where the documentation would be either outdated than their latest code. No proper examples would be available for intricate flows. We can see the developers venting out on documentation in their github issues page.
Chaining jobs to orchestrate a higher order job
Chaining existing jobs to orchestrate a higher order job is a common need while building a composable data pipelines. Here we try to break a bigger data flow into smaller meaningful data pipelines which has use of its own. At the same time, for different needs we may come up with building a higher order job which picks & chooses which child jobs to run.
Few of the dagster threads that discusses this problem :
- How to properly trigger a job once other are done?
- Chaining Jobs More Easily
- Simplify Sequential and/or Concurrent job scheduling
At the momemt, dagster team feels sensors are the best solution for this. Out of my frustation while solving these problems, this makes me feel dagster dev team is so detached from reality and real production needs. Just imagine if we have many jobs and try to build a chaining job, how many sensors, conditional logic ( whether it is triggered from chained job or not ), this slowly becomes too complex. When simple needs gets unnecessarily complicated, it’s a signal that underlying layers are not done right.
Dagster config parameters for pipeline run
To pass parameters to pipeline run, we should use pydantic models based on dagster.Config objects. This is a straight forward need. Dagster team has complicated this by supporting only their supported objects. Initially when we start a project, these won’t be hurting us. Once the pipelines grows and we have needs like pipeline trigger from external micro services, with payloads like nested objects etc., we end up spending time on not solving the business problem. But we spend time on thinking how to get this working in dagster. This is a red flag.
Here is another scenario. We had a need extract common utils library also holds these config models to build value added utilities around them. Now, we have to add dagster as a dependency for that library unnecessarily. This could have been pure pydantic base models or plain old python objects.
Dagster is not a good solution for event driven data flow — period
At a high level, it may feel dagster’s sensor could solve the event driven needs. But once we start doing more development, we will realize how painful it is to accomplish something. In comparison, the same event driven need would have been straight forward in other workflow orchestrators like Temporal. Under the hood, dagster’s sensor is a scheduled polling mechanism. It’s a sugar coat on top of it’s scheduler. In other systems that promise event driven workflow, they are pub-sub based strategy — example : Temporal.
Dagster creates a confusing complex spaghetti with sensors which should be tied to a job with a relationship with a asset selection. I am stopping here to make the sentence easy to digest. Every time I add a event driven flow using dagster’s sensor, it is a pain.
Post action for job has lot of restrictions
Background : Asset job is a job that works with collection of data assets. In simple words, in apache airflow, we might have a job that iterates a list of tables and perform ELT.
While working with data pipelines jobs, we will encounter many custom needs like perform some action on failed assets in a asset job.
The above is the current state of how to solve this ( as on 2025-July ). There is no way to know this from dagster docs. After few days of searching we might narrow down to this gist. Still it may not satisfy our needs but help us to a workaround.
Related discussion : Get failed asset from dagster run with @run_status_sensor/@run_failure_sensor
Customizing log format
This is a common need. However, it is so painful to customize log format in dagster.
Dagster docs on them :
- https://docs.dagster.io/guides/monitor/logging/custom-logging
- https://docs.dagster.io/guides/monitor/logging/python-logging
However, if we start with this journey, we will go in circles and some of the logs will be formated and dagster internal logs won’t be formatted. For jobs we can configure logger definitions. However it won’t work as expected.
Here are some threads related to concerns with logging :
- Standardizing logging configuration in Dagster
- Default logger from Definitions is not picked up
- Change logger behavior so that setting a default logger in a Definitions object changes
- Structured / json logging
If we are using DataDog or elastic search for log management, emitting logs as JSON is a pain.
Dagster’s error stacktrace is a pain in the neck
Here is a sample error message. Try to guess what is the root cause & where it is breaking
2025-05-28 10:42:58 +0530 - dagster - WARNING - /Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_core/workspace/context.py:824: UserWarning: Error loading repository location example:dagster._core.errors.DagsterUserCodeLoadError: Error occurred during the loading of Dagster definitions in
executable_path=/Users/sairam/data-pipeline/.venv/bin/python, module_name=example.definitions, working_directory=/Users/sairam/data-pipeline
Stack Trace:
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_grpc/server.py", line 420, in __init__
self._loaded_repositories: Optional[LoadedRepositories] = LoadedRepositories(
^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_grpc/server.py", line 242, in __init__
with user_code_error_boundary(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/.pyenv/versions/3.12.9/lib/python3.12/contextlib.py", line 158, in __exit__
self.gen.throw(value)
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_core/errors.py", line 299, in user_code_error_boundary
raise new_error from e
The above exception was caused by the following exception:
ValueError: not enough values to unpack (expected 2, got 0)
Stack Trace:
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_core/errors.py", line 289, in user_code_error_boundary
yield
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_grpc/server.py", line 253, in __init__
loadable_targets = get_loadable_targets(
^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_grpc/utils.py", line 51, in get_loadable_targets
else loadable_targets_from_python_module(module_name, working_directory)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_core/workspace/autodiscovery.py", line 33, in loadable_targets_from_python_module
module = load_python_module(
^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_core/code_pointer.py", line 135, in load_python_module
return importlib.import_module(module_name)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/.pyenv/versions/3.12.9/lib/python3.12/importlib/__init__.py", line 90, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
File "<frozen importlib._bootstrap>", line 1331, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 935, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 999, in exec_module
File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
File "/Users/sairam/data-pipeline/example/definitions.py", line 14, in <module>
from example.s3_data.assets.data_import_assets import data_imports
File "/Users/sairam/data-pipeline/example/s3_data/assets/data_import_assets.py", line 27, in <module>
@dg.asset(
^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_core/definitions/decorators/asset_decorator.py", line 336, in inner
return create_assets_def_from_fn_and_decorator_args(args, fn)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_core/definitions/decorators/asset_decorator.py", line 535, in create_assets_def_from_fn_and_decorator_args
return builder.create_assets_definition()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py", line 576, in create_assets_definition
node_def=self.create_op_definition(),
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py", line 556, in create_op_definition
return _Op(
^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_core/definitions/decorators/op_decorator.py", line 104, in __call__
self.config_schema = infer_schema_from_config_annotation(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_config/pythonic_config/conversion_utils.py", line 329, in infer_schema_from_config_annotation
return infer_schema_from_config_class(model_cls)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_config/pythonic_config/config.py", line 446, in infer_schema_from_config_class
fields[resolved_field_name] = _convert_pydantic_field(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_config/pythonic_config/conversion_utils.py", line 120, in _convert_pydantic_field
config_type = _config_type_for_type_on_pydantic_field(field_type)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_config/pythonic_config/conversion_utils.py", line 187, in _config_type_for_type_on_pydantic_field
return Noneable(_config_type_for_type_on_pydantic_field(optional_inner_type))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_config/pythonic_config/conversion_utils.py", line 191, in _config_type_for_type_on_pydantic_field
key_type, value_type = get_args(potential_dagster_type)
^^^^^^^^^^^^^^^^^^^^
warnings.warn(f"Error loading repository location {location_name}:{error.to_string()}")
2025-05-28 10:08:56 +0530 - dagster - WARNING - /Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_core/workspace/context.py:824: UserWarning: Error loading repository location example:dagster._core.errors.DagsterUserCodeUnreachableError: Could not reach user code server. gRPC Error code: UNAVAILABLE
Stack Trace:
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_core/workspace/context.py", line 819, in _load_location
else origin.create_location(self.instance)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_core/remote_representation/origin.py", line 370, in create_location
return GrpcServerCodeLocation(self, instance=instance)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_core/remote_representation/code_location.py", line 697, in __init__
list_repositories_response = sync_list_repositories_grpc(self.client)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_api/list_repositories.py", line 20, in sync_list_repositories_grpc
api_client.list_repositories(),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_grpc/client.py", line 328, in list_repositories
res = self._query("ListRepositories", dagster_api_pb2.ListRepositoriesRequest)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_grpc/client.py", line 205, in _query
self._raise_grpc_exception(
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_grpc/client.py", line 188, in _raise_grpc_exception
raise DagsterUserCodeUnreachableError(
The above exception was caused by the following exception:
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses; last error: UNKNOWN: unix:/var/folders/s_/5wlbny_s0txbc72zbyb0hgk40000gn/T/tmpea23135c: connect: No such file or directory (2)"
debug_error_string = "UNKNOWN:Error received from peer {created_time:"2025-05-28T10:08:56.015615+05:30", grpc_status:14, grpc_message:"failed to connect to all addresses; last error: UNKNOWN: unix:/var/folders/s_/5wlbny_s0txbc72zbyb0hgk40000gn/T/tmpea23135c: connect: No such file or directory (2)"}"
>
Stack Trace:
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_grpc/client.py", line 203, in _query
return self._get_response(method, request=request_type(**kwargs), timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_grpc/client.py", line 163, in _get_response
return getattr(stub, method)(request, metadata=self._metadata, timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/grpc/_channel.py", line 1181, in __call__
return _end_unary_response_blocking(state, call, False, None)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/grpc/_channel.py", line 1006, in _end_unary_response_blocking
raise _InactiveRpcError(state) # pytype: disable=not-instantiable
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The above exception occurred during handling of the following exception:
dagster._core.errors.DagsterUserCodeUnreachableError: Could not reach user code server. gRPC Error code: UNKNOWN
Stack Trace:
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_grpc/server_watcher.py", line 120, in watch_grpc_server_thread
watch_for_changes()
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_grpc/server_watcher.py", line 83, in watch_for_changes
new_server_id = client.get_server_id(timeout=REQUEST_TIMEOUT)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_grpc/client.py", line 311, in get_server_id
res = self._query("GetServerId", dagster_api_pb2.Empty, timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_grpc/client.py", line 205, in _query
self._raise_grpc_exception(
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_grpc/client.py", line 188, in _raise_grpc_exception
raise DagsterUserCodeUnreachableError(
The above exception was caused by the following exception:
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNKNOWN
details = "Exception calling application: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses; last error: UNKNOWN: unix:/var/folders/s_/5wlbny_s0txbc72zbyb0hgk40000gn/T/tmpf8mbb64t: connect: No such file or directory (2)"
debug_error_string = "UNKNOWN:Error received from peer {grpc_message:"failed to connect to all addresses; last error: UNKNOWN: unix:/var/folders/s_/5wlbny_s0txbc72zbyb0hgk40000gn/T/tmpf8mbb64t: connect: No such file or directory (2)", grpc_status:14, created_time:"2025-05-28T10:08:45.946049+05:30"}"
>"
debug_error_string = "UNKNOWN:Error received from peer {created_time:"2025-05-28T10:08:45.946262+05:30", grpc_status:2, grpc_message:"Exception calling application: <_InactiveRpcError of RPC that terminated with:\n\tstatus = StatusCode.UNAVAILABLE\n\tdetails = \"failed to connect to all addresses; last error: UNKNOWN: unix:/var/folders/s_/5wlbny_s0txbc72zbyb0hgk40000gn/T/tmpf8mbb64t: connect: No such file or directory (2)\"\n\tdebug_error_string = \"UNKNOWN:Error received from peer {grpc_message:\"failed to connect to all addresses; last error: UNKNOWN: unix:/var/folders/s_/5wlbny_s0txbc72zbyb0hgk40000gn/T/tmpf8mbb64t: connect: No such file or directory (2)\", grpc_status:14, created_time:\"2025-05-28T10:08:45.946049+05:30\"}\"\n>"}"
>
Stack Trace:
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_grpc/client.py", line 203, in _query
return self._get_response(method, request=request_type(**kwargs), timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/dagster/_grpc/client.py", line 163, in _get_response
return getattr(stub, method)(request, metadata=self._metadata, timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/grpc/_channel.py", line 1181, in __call__
return _end_unary_response_blocking(state, call, False, None)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/sairam/data-pipeline/.venv/lib/python3.12/site-packages/grpc/_channel.py", line 1006, in _end_unary_response_blocking
raise _InactiveRpcError(state) # pytype: disable=not-instantiable
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
warnings.warn(f"Error loading repository location {location_name}:{error.to_string()}")Python’s generic error stack trace provides a clean way of knowing where & what broke. However, since dagster adds multiple layers on top when things break the stacktrace is not developer friendly. The root cause of the failure is there in the above the stacktrace. However it’s not straightforward.
For the above exception stacktrace, the root cause is :
class MyCustomConfig(dg.Config):
table_name: str
callback: Optional[Dict] = NoneHere., Dict type cannot be used like above in a dagster Config object. It should be Dict[str, str] with typing information. As we can see, the reason for error, where it happened, line number etc is hard to understand from the stack trace.
In the latest release, dagster team provides a cli tool called dg which provides a check cli command that tries to provide a more developer friendly error message. Still this command line tool doesn’t address all runtime error messages.
[cloud hosted version ] Sensors not available in branch
We can’t use sensors in branch deployments. There are ways to test the sensors. However the full flow cannot be run on branch deployments. In scenarios where branch deployments are crucial, this becomes a problem. For example, let’s branch deployments play critical role in building development / staging / testing environment data flow. We have to have a cumbersome workaround to solve this.
However if we use self hosted open source, we can work around this problem and use sensor with dedicated dagster instances per environment.
[cloud hosted version ] User limit of 3 in hosted Starter plan
In hosted dagster, we have only few paid options.
In this, for enterprise usage, the Starter plan is what we get. Pro plan is custom tailored & comes with huge price tag. Instead of $100/month we can’t immediately go to 4 figures or more without having a real compelling need. The starter plan has so many restrictions which will make you think — do I really need dagster ? Few examples :
- User limit of 3 for a starter plan is way too less.
- If we use a hybrid option where the cloud resources are hosted by us, there is no reduced pricing option. This is bad pricing model.
