Daniil Ponizov
12/06/2021, 4:46 PMHugo Shi
12/06/2021, 6:05 PMHoratiu Bota
12/06/2021, 6:34 PMs3result.write(overwrite=True)
)?Theo Platt
12/06/2021, 7:25 PMIsaac Brodsky
12/06/2021, 8:19 PMdask.compute()
using a Dask Executor? I'm trying to compare my upgraded (0.13.x to latest) flow code with a working example and right now my compute
calls hang.Zane Selvans
12/06/2021, 8:36 PMLeon Kozlowski
12/06/2021, 9:07 PMprefect build
I am running into an error mkdir: cannot create directory '/opt/prefect/': Permission denied
- I am creating a user in my Dockerfile thru useradd
it appears there are extra commands prefect appends to the user defined Dockerfile - is the root user required?James McNeilis
12/06/2021, 9:59 PMFailed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'An unknown error occurred.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 91, in call_runner_target_handlers
state = self.client.set_task_run_state(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 1917, in set_task_run_state
result = self.graphql(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 569, in graphql
raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'An unknown error occurred.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
I've put the task and flow runs urls in the 🧵 .Alvaro Durán Tovar
12/06/2021, 10:23 PMPREFECT__CLOUD__SEND_FLOW_RUN_LOGS=true
PREFECT__LOGGING__LOG_TO_CLOUD=true
I'm using @task(log_stdout=True)
and I can see the message in kubernetes it self. I can see the message "hey budy" in the log as you can see here, but is not present in the cloud UI
am I missing something?Dominic Pham
12/07/2021, 12:06 AMalins
12/07/2021, 11:35 AMsarika gowtham
12/07/2021, 11:57 AMexecutor = DaskExecutor(address="<tcp://127.0.0.1:8787>")
flow.run(executor=executor)
Zheng Xie
12/07/2021, 1:05 PMZheng Xie
12/07/2021, 1:05 PMTilak Maddy
12/07/2021, 1:21 PMwith Flow(...) as flow:
a = first_task()
b = second_task()
say there are tasks which are defined and imported from other files (which indeed call other tasks and so on..) There is no way I can look at just this main file and tell how the entire flow is gonna look like ? What kind of dependencies all the tasks will have, retries, etc, etc Basically there isn't much info I can get . So what are we doing here ? (and why?)Ievgenii Martynenko
12/07/2021, 1:24 PMJason Motley
12/07/2021, 3:20 PMJohn Shearer
12/07/2021, 6:16 PMPREFECT__FLOWS__CHECKPOINTING=false
but with checkpoint data present in the prefect result directory would read from those results? - I would expect this, but this is the current behaviour (on my machine ...)Erik Schomburg
12/07/2021, 6:31 PMtask.map
functionality doesn’t quite work as I had expected. I’m aggregating a bunch of data sources and rows into a DataFrame, and previously this had been sped up by running it over subsets of keys, with each subset running in a different process. So in prefect, I have a task that splits the keys into subsets, and then maps a task over those subsets. The problem is that there’s a small probability of failure on each subset, due to connection timeouts. I have added some retry logic, but still want to make sure that successful sub-tasks have their results checkpointed, and unsuccessful ones are not. But the results = task.map(subset=subsets)
code instead just stores the results
in a single file, and then does not re-run unsuccessful sub-tasks. I tried adding {map_index}
to the task target filename pattern, {
brackets, i.e., {{map_index}}
🤦).
Here’s the basic flow:
all_keys = get_keys_task()
key_subsets = partition_keys_task(all_keys, n_subsets)
data_subsets = get_data_task.map(keys=key_subsets)
all_data = concatenate_subsets_task(data_subsets=data_subsets)
I know I can work around this by writing my own utility to create a list of tasks with their own unique task names, but it seems like part of the point of .map
ought to be to do this sort of results management for you... Any tips? Maybe there’s just a parameter in the prefect.task
decorator or the task.map
function I don’t know about?Tony Yun
12/07/2021, 7:49 PM@task(log_stdout=True)
def test():
print(f'current folder: {Path.cwd()}')
raise Exception('test exception')
Task 'test': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 863, in get_task_run_state
logger=self.logger,
File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/tonyyun/github/data_operations/dbt_automation/flow.py", line 296, in test
Exception: test exception
, where /Users/tonyyun/github
shouldn’t be there^Jeffery Newburn
12/07/2021, 8:34 PMbitsofinfo
12/07/2021, 8:56 PMJason Motley
12/07/2021, 9:43 PMif(select * from table) = error then (alter table add column) else somethingelse
?Billy McMonagle
12/07/2021, 10:45 PMEmma Rizzi
12/08/2021, 8:11 AMA single agent can manage many concurrent flow runs
-> does it mean that if I schedule multiple flow at the same time for the same agent, the agent will be able to treat them parallelly ? Or does it queue the jobs ?
ThanksJoël Luijmes
12/08/2021, 9:28 AMLocalDaskExecutor
as my requirements for concurrency were low. However with this flow I want to use the real DaskExecutor
.
As I’m aware Prefect is capable of creating a temporal Dask cluster for the running flow (using KubeCluster
), but alternatively can use an existing deployed Dask cluster (or even dask gateway if I’m not mistaken).
Note, I’m running in Kubernetes, so adaptive scaling could be interesting.
Are there any guidelines / suggestions / experiences for using a temporal vs. static Dask cluster? Additionally, if I have Docker
as storage, how can I supply the correct image tag containing my modules/dependencies while registering the flow?Ivan Zaikin
12/08/2021, 10:00 AMprefect==2.0a5
in Docker.
Here is my Dockerfile:
FROM python:3.8
RUN adduser prefect
USER prefect
WORKDIR /home/prefect
COPY --chown=prefect:prefect <http://requirements.in|requirements.in> ./
ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8
ENV PATH="/home/prefect/.local/bin:${PATH}"
RUN pip install --user --no-cache-dir -r <http://requirements.in|requirements.in>
COPY --chown=prefect:prefect flow.py flow_deployment.py ./
Inside the container I create a deployment and several flow runs, but all of them are marked as “late”. Here is the terminal output:
$ prefect orion start --host 0.0.0.0 --log-level DEBUG
Starting Orion API server...
INFO: Started server process [71]
INFO: Waiting for application startup.
09:54:06.189 | Scheduler service scheduled to start in-app
09:54:06.189 | MarkLateRuns service scheduled to start in-app
INFO: Application startup complete.
INFO: Uvicorn running on <http://0.0.0.0:4200> (Press CTRL+C to quit)
09:54:06.501 | Finished monitoring for late runs.
09:54:06.538 | Scheduled 0 runs.
Starting agent connected to <http://0.0.0.0:4200/api/>...
Agent started! Checking for flow runs...
09:54:07.298 | Submitting flow run 'f0855bd3-2eab-4346-ad3a-2e237a688faa'
09:54:07.298 | Submitting flow run '521624eb-cec8-4f9b-9e92-f203e104586a'
09:54:07.298 | Submitting flow run 'f25da7b6-7893-4107-aaa3-df22377e2ccf'
09:54:07.299 | Submitting flow run '2895bdfa-082c-43b5-afc2-d0dcc269bf51'
09:54:07.299 | Submitting flow run '2b445dbd-58b4-4acb-89b7-1f6782dc0ec9'
09:54:07.300 | Completed submission of flow run 'f0855bd3-2eab-4346-ad3a-2e237a688faa'
09:54:07.300 | Completed submission of flow run '521624eb-cec8-4f9b-9e92-f203e104586a'
09:54:07.300 | Completed submission of flow run 'f25da7b6-7893-4107-aaa3-df22377e2ccf'
09:54:07.300 | Completed submission of flow run '2895bdfa-082c-43b5-afc2-d0dcc269bf51'
09:54:07.300 | Completed submission of flow run '2b445dbd-58b4-4acb-89b7-1f6782dc0ec9'
09:54:08.969 | Flow run '521624eb-cec8-4f9b-9e92-f203e104586a' exited with exception: KeyError('__main__')
09:54:08.975 | Flow run '2895bdfa-082c-43b5-afc2-d0dcc269bf51' exited with exception: KeyError('__main__')
09:54:08.976 | Flow run 'f25da7b6-7893-4107-aaa3-df22377e2ccf' exited with exception: KeyError('__main__')
09:54:08.979 | Flow run '2b445dbd-58b4-4acb-89b7-1f6782dc0ec9' exited with exception: KeyError('__main__')
09:54:08.980 | Flow run 'f0855bd3-2eab-4346-ad3a-2e237a688faa' exited with exception: KeyError('__main__')
Is there a way to debug these KeyErrors?Ievgenii Martynenko
12/08/2021, 1:04 PMlogger = logging.getLogger()
. The idea is to extend Prefect Task class and run some magic using LIB library. I've read that we can add as many NAMED loggers as we want using https://docs.prefect.io/core/concepts/logging.html#extra-loggers, but since with have root one, what happens now is: Prefect logs its records as usual, but messages from LIB are not passed. I suppose this is due to LIB logger is root one.
Have you ever faced such situation?Justin
12/08/2021, 2:32 PMflow = Flow("taskname")
flow.run_config = KubernetesRun(env={"POSTGRES_USER": "2234234",
...
},
image = "dockerimage:latest"
)
flow.register('projectname')
Vadym Dytyniak
12/08/2021, 2:54 PM