Jonas Hanfland
11/30/2020, 10:37 AMpending
indefinitely. Not even the timeout provided to @task
gets triggered.
When checking the logs for the mapped task in question, I found what seems to be an internal server error (see thread).
Rerunning it multiple times in the past resulted in the same issue (with the same exception), except this Saturday when it miraculously succeeded but then failed again the next day.
Is that exception the cause of the problem? Is the issue on prefect's side or on mine? Thanks in advanceemre
11/30/2020, 10:55 AMflow.run(executor=LocalDaskExecutor(scheduler="threads", num_workers=4))
and can execute tasks in parallel. The server registration is as follows:
flow.storage = Docker(....)
flow.run_config = DockerRun()
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=4)
flow.register(project_name=... , build=False)
The flow can run and communicate with prefect server, but the tasks are executed sequentially. Both the gannt chart and the logs show this.
Am I missing something? (prefect==0.13.13)Ben
11/30/2020, 2:16 PMrun_config
then.
Is that use case normal or am I thinking about it wrong? Just to double check that I'm not attempting anything totally unidiomatic here.matta
11/30/2020, 5:22 PMmatta
11/30/2020, 5:22 PMdbt_task = DbtShellTask(
...
)
with Flow("dbt-flow") as flow:
dbt_command_string = Parameter('dbt_command_string', default = 'dbt run', required = True)
dbt_commands = dbt_command_string.split(',')
for command in dbt_commands:
dbt_task(
command=dbt_command,
...
)
matta
11/30/2020, 5:23 PMmap
that forces it to run serially?matta
11/30/2020, 5:23 PMmatta
11/30/2020, 5:23 PMmatta
11/30/2020, 5:23 PMJohn Grubb
11/30/2020, 7:25 PMUnexpected error while running flow: KeyError('Task slug foo-3 not found in the current Flow; this is usually caused by changing the Flow without reregistering it with the Prefect API.')
It's always the same task for both pipelines - I've split out a common task to load data to BigQuery into another file and this is the task that fails. It will run fine for a few days and then randomly decide that it can't find this task, even though the exact same task is called two other times during each pipeline, foo-1 and foo-2. I'm wondering if I'm the first person to have their install randomly lose tasks..josh
11/30/2020, 8:45 PM0.13.18
has been released and here are a few notable changes:
⁉️ Formatted GraphQL errors are now raised by the Client
👟 Refactored a few Client API calls for performance
📚 Various enhancements and fixes to tasks in the Task Library
A big thank you to our contributors who helped out with this release! Full changelog https://github.com/PrefectHQ/prefect/releases/tag/0.13.18Sam Luen-English
11/30/2020, 9:40 PMFROM python:3.8
RUN python -m pip install sqlalchemy prefect
main.py
from prefect import Flow, task
from prefect.environments.storage import Docker
import sqlalchemy
@task
def some_task():
print(sqlalchemy.__version__)
with Flow("Some Flow") as flow:
some_task()
if __name__ == "__main__":
flow.storage = Docker(
registry_url="<http://docker.io|docker.io>",
image_name="storage_image",
image_tag="latest",
base_image="test-base",
local_image=True,
)
flow.register(project_name="Development")
Test script:
#!/usr/bin/env bash
set -x
docker build . -t test-base
#: Approach 1
python main.py
#: Approach 2
docker run -v /var/run/docker.sock:/var/run/docker.sock -v ~/.docker:/root/.docker -e PREFECT__CLOUD__AUTH_TOKEN="$PREFECT__CLOUD__AUTH_TOKEN" -v $(pwd)/main.py:/main.py test-base bash -c 'python /main.py'
Please could you guide me with the best approach?
Many thanks!Pedro Machado
12/01/2020, 5:09 AMstart_date
parameter that is not required. When the parameter is passed, we validate it and use it in the flow. When it's not passed, we use the flow's scheduled_start_time
to derive the start date. This complicates the flow a little bit. I normally have to create a downstream task that takes the Parameter
result and applies this logic.
I was thinking that it would be nice to be able to pass a callable as the default
argument of the Parameter
class to provide a value if the parameter is missing. Similarly, it would be nice to have a validator
callable that could be used to validate the parameter value.
Is there another way to accomplish this? Thoughts?Eric
12/01/2020, 6:55 AMSam Luen-English
12/01/2020, 1:03 PMfrom prefect import Flow, task
from prefect.environments.storage import Docker
import sqlalchemy
@task
def some_task():
print(sqlalchemy.__version__)
with Flow("test") as flow:
some_task()
if __name__ == "__main__":
flow.storage = Docker(
registry_url="<http://docker.io/atheon|docker.io/atheon>",
image_name="prefect-test",
base_image="atheon/prefect-test:base",
)
flow.register(project_name="Development")
karteekaddanki
12/01/2020, 3:05 PM---> Running in 26ae91f84100
Beginning health checks...
System Version check: OK
Cloudpickle serialization check: OK
Result check: OK
Environment dependency check: OK
All health checks passed.
Removing intermediate container 26ae91f84100
---> a63da9840884
Traceback (most recent call last):
File "register.py", line 60, in <module>
storage.build()
File "/home/karteek/anaconda3/envs/prefect/lib/python3.6/site-packages/prefect/environments/storage/docker.py", line 359, in build
self._build_image(push=push)
File "/home/karteek/anaconda3/envs/prefect/lib/python3.6/site-packages/prefect/environments/storage/docker.py", line 423, in _build_image
self._parse_generator_output(output)
File "/home/karteek/anaconda3/envs/prefect/lib/python3.6/site-packages/prefect/environments/storage/docker.py", line 661, in _parse_generator_output
or parsed.get("errorDetail", {}).get("message")
AttributeError: 'NoneType' object has no attribute 'strip'
Any thoughts on what might be happening here? I can confirm that the docker image is building without any issues and the logs show that all the health checks are passing. I am using python 3.6Charles Lariviere
12/01/2020, 3:41 PMflow.run()
(instead of flow.register()
) and running with a local-agent, everything works great. Though, when I run the following script to register it in our Prefect Cloud account and execute it through the UI, it works (as in, the query gets executed) but I get the following error: Unexpected error: TypeError("can't pickle _thread.lock objects")
.
My question is; what are best practices for structuring these scripts? Once you’ve built the flow, should you always end it with flow.run()
and flow.register()
? Do you manually register flows when making changes but not include the register()
method inside the actual script?Sean Talia
12/01/2020, 5:26 PMAndrey Tatarinov
12/01/2020, 6:06 PMAndrew Hannigan
12/01/2020, 8:24 PMZach
12/01/2020, 9:38 PMPedro Machado
12/02/2020, 1:29 AMSlackbot
12/02/2020, 11:54 AMAlex Koay
12/02/2020, 1:29 PMRodrigo Ceballos
12/02/2020, 2:49 PMIason Andriopoulos
12/02/2020, 2:52 PMAndrey Tatarinov
12/02/2020, 3:49 PMSean Talia
12/02/2020, 5:24 PMDockerRun
run config, Docker
for storage) with labels, and the labels show up in the prefect UI, but not in the terminal output? e.g. I execute:
with Flow(
"docker-flow", run_config=DockerRun(labels=["data-science"]), storage=Docker()
) as flow:
and then register the flow. The flow shows up in the UI with the data-science
label, but the terminal output when registering the flow looks like:
Flow URL: <flow_url>
└── ID: <id>
└── Project: test-project
└── Labels: []
this was causing me to believe that the labels weren't getting attached to the flow, but they are in fact thereAnish Chhaparwal
12/02/2020, 5:27 PMgit_clone=ShellTask(log_stderr=True)
with Flow ('QETL') as flow:
git_url = Parameter("git_url",
default="<https://github.com/ieee8023/covid-chestxray-dataset>")
git_clone(command="git clone {url} {target}".format(url=git_url)
flow.run()
ale
12/02/2020, 6:24 PM