Tilak Maddy
12/06/2021, 10:54 AMUserWarning: A flow with the same name is already contained in storage; if you changed your Flow since the last build, you might experience unexpected issues and should re-create your storage object.
serialized_flow = flow.serialize(build=build) # type: Any
Flow URL: <https://cloud.prefect.io/XXX/flow/ZZZ>
└── ID: XXXX
└── Project: test_proj_2
└── Labels: ['dev']
Hey y'all I am running flows in multiple projects but all the flows have the same name and same storage (by design, i don't wish to change the name). However when I register the flow to several projects I get this warning every time I call flow.register(project_name)
(with different metadata of course like schedule time, etc) on a different project. I want to know what that warning means . Since I have the flows setup in production I want to know that if I update the flow metadata for one project is it going to do the other flows dirty ?Jelle Vegter
12/06/2021, 12:11 PMThomas Nyegaard-Signori
12/06/2021, 2:03 PMCreateNameSpacedJob
tasks, spawning jobs on our kubernetes cluster. We are hosting our own Prefect server using the docker-compose command directly on a moderately large VM (Standard E8ds v5 VM type from Azure).
It seems as if the flow-pod, the pod responsible for orchestrating the tasks, is losing connection to the backend, specifically the apollo
service as can be seen in the first screenshot. All of a sudden, all CreateNameSpacedJob
would fail at the same time when the CloudTaskRunner
went to update the state of the task. I did a bit of digging with netstat
and it seems that there are quite a bit of TCP connections being created in the apollo
container, however, I am not entirely sure if that is "business as usual" or a bit on the heavy side for this kind of setup. Have anyone else experience these kinds of hiccups or are using a similar setup that might have ideas? I dont know whether the second screenshot is of relevance but it has started to pop up quite a lot and I cant seem to figure out whats causing it.brian
12/06/2021, 3:17 PMJason Motley
12/06/2021, 3:30 PMDaniil Ponizov
12/06/2021, 4:46 PMHugo Shi
12/06/2021, 6:05 PMHoratiu Bota
12/06/2021, 6:34 PMs3result.write(overwrite=True)
)?Theo Platt
12/06/2021, 7:25 PMIsaac Brodsky
12/06/2021, 8:19 PMdask.compute()
using a Dask Executor? I'm trying to compare my upgraded (0.13.x to latest) flow code with a working example and right now my compute
calls hang.Zane Selvans
12/06/2021, 8:36 PMLeon Kozlowski
12/06/2021, 9:07 PMprefect build
I am running into an error mkdir: cannot create directory '/opt/prefect/': Permission denied
- I am creating a user in my Dockerfile thru useradd
it appears there are extra commands prefect appends to the user defined Dockerfile - is the root user required?James McNeilis
12/06/2021, 9:59 PMFailed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'An unknown error occurred.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 91, in call_runner_target_handlers
state = self.client.set_task_run_state(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 1917, in set_task_run_state
result = self.graphql(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 569, in graphql
raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'An unknown error occurred.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
I've put the task and flow runs urls in the 🧵 .Alvaro Durán Tovar
12/06/2021, 10:23 PMPREFECT__CLOUD__SEND_FLOW_RUN_LOGS=true
PREFECT__LOGGING__LOG_TO_CLOUD=true
I'm using @task(log_stdout=True)
and I can see the message in kubernetes it self. I can see the message "hey budy" in the log as you can see here, but is not present in the cloud UI
am I missing something?Dominic Pham
12/07/2021, 12:06 AMalins
12/07/2021, 11:35 AMsarika gowtham
12/07/2021, 11:57 AMexecutor = DaskExecutor(address="<tcp://127.0.0.1:8787>")
flow.run(executor=executor)
Zheng Xie
12/07/2021, 1:05 PMZheng Xie
12/07/2021, 1:05 PMTilak Maddy
12/07/2021, 1:21 PMwith Flow(...) as flow:
a = first_task()
b = second_task()
say there are tasks which are defined and imported from other files (which indeed call other tasks and so on..) There is no way I can look at just this main file and tell how the entire flow is gonna look like ? What kind of dependencies all the tasks will have, retries, etc, etc Basically there isn't much info I can get . So what are we doing here ? (and why?)Ievgenii Martynenko
12/07/2021, 1:24 PMJason Motley
12/07/2021, 3:20 PMJohn Shearer
12/07/2021, 6:16 PMPREFECT__FLOWS__CHECKPOINTING=false
but with checkpoint data present in the prefect result directory would read from those results? - I would expect this, but this is the current behaviour (on my machine ...)Erik Schomburg
12/07/2021, 6:31 PMtask.map
functionality doesn’t quite work as I had expected. I’m aggregating a bunch of data sources and rows into a DataFrame, and previously this had been sped up by running it over subsets of keys, with each subset running in a different process. So in prefect, I have a task that splits the keys into subsets, and then maps a task over those subsets. The problem is that there’s a small probability of failure on each subset, due to connection timeouts. I have added some retry logic, but still want to make sure that successful sub-tasks have their results checkpointed, and unsuccessful ones are not. But the results = task.map(subset=subsets)
code instead just stores the results
in a single file, and then does not re-run unsuccessful sub-tasks. I tried adding {map_index}
to the task target filename pattern, {
brackets, i.e., {{map_index}}
🤦).
Here’s the basic flow:
all_keys = get_keys_task()
key_subsets = partition_keys_task(all_keys, n_subsets)
data_subsets = get_data_task.map(keys=key_subsets)
all_data = concatenate_subsets_task(data_subsets=data_subsets)
I know I can work around this by writing my own utility to create a list of tasks with their own unique task names, but it seems like part of the point of .map
ought to be to do this sort of results management for you... Any tips? Maybe there’s just a parameter in the prefect.task
decorator or the task.map
function I don’t know about?Tony Yun
12/07/2021, 7:49 PM@task(log_stdout=True)
def test():
print(f'current folder: {Path.cwd()}')
raise Exception('test exception')
Task 'test': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 863, in get_task_run_state
logger=self.logger,
File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/tonyyun/github/data_operations/dbt_automation/flow.py", line 296, in test
Exception: test exception
, where /Users/tonyyun/github
shouldn’t be there^Jeffery Newburn
12/07/2021, 8:34 PMbitsofinfo
12/07/2021, 8:56 PMJason Motley
12/07/2021, 9:43 PMif(select * from table) = error then (alter table add column) else somethingelse
?Billy McMonagle
12/07/2021, 10:45 PMEmma Rizzi
12/08/2021, 8:11 AMA single agent can manage many concurrent flow runs
-> does it mean that if I schedule multiple flow at the same time for the same agent, the agent will be able to treat them parallelly ? Or does it queue the jobs ?
ThanksEmma Rizzi
12/08/2021, 8:11 AMA single agent can manage many concurrent flow runs
-> does it mean that if I schedule multiple flow at the same time for the same agent, the agent will be able to treat them parallelly ? Or does it queue the jobs ?
ThanksAmanda Wee
12/08/2021, 8:57 AMEmma Rizzi
12/08/2021, 2:08 PMKevin Kho
12/08/2021, 2:13 PMEmma Rizzi
12/10/2021, 10:26 AM