E Li
02/02/2022, 4:08 PMJason Motley
02/02/2022, 5:26 PMAn Hoang
02/02/2022, 5:38 PMflow_A
with task1, task2... task10
. Now I want to construct flow_B
with task1, task2.... task 20
and task11_result = task11(task10_result)
. How do I add the tasks of flow_A
as upstream tasks for flow_B
. I know I can do create_flow_run
, but I do not want monitoring for task1...task10
in flow_B
Kind of like:
with Flow("flow_A") as flow:
flow_A_param1 = Parameter("flow_A_param1")
...
flow_A_param_n = Parameter(...)
...
task1 = ....
...
task10_result = task10(...)
with Flow("flow_B") as flow:
flow_A_param1 = Parameter(...)
...
flow_A_param_n = Parameter(...)
flow_A_result = flow_A.run(parameters = {"flow_A_param1": flow_A_param1,...., "flow_A_param_n": flow_A_param_n}) #dont want monitoring for these tasks
task10_result = flow_A_result.result[flow_A.get_task(task10)]
task11_result = task11(task10_result)
...
task20_result = task20(...)
where `flow_B`'s parameters are just `flow_A`'s parameters. But in the example above I always have to repeat the parameters of flow_A
in flow_B
and keep them both up to date with each other. Is there a better way to do this?Chris Reuter
02/02/2022, 6:00 PMDevin Flake
02/02/2022, 7:51 PMNelson Griffiths
02/02/2022, 9:58 PMjspeis
02/03/2022, 1:43 AMupstream_tasks
kwarg (e.g. as in example below) I get an Unexpected keyword argument 'upstream_tasks' in function call (unexpected-keyword-arg)
... is there a way I can get pylint to recognize this is ok? (other than disabling?)
@task
def add_one(val):
return val + 1
with Flow("Add One") as flow:
p = Parameter("param")
first = add_one(p)
second = add_one(p, upstream_tasks=[first])
Sumit Kumar Rai
02/03/2022, 6:47 AMSuresh R
02/03/2022, 10:22 AMMuddassir Shaikh
02/03/2022, 12:20 PMIevgenii Martynenko
02/03/2022, 2:26 PMArnaldo Russo
02/03/2022, 2:44 PMArnaldo Russo
02/03/2022, 2:53 PMCaleb Ejakait
02/03/2022, 3:10 PMTalmaj Marinč
02/03/2022, 3:44 PMprefect get logs -n name_of_the_run
but they seemed to be incomplete. I get only 2636 lines, for less than an hour of the flow. The logs are there since I see them in the UI. Any suggestions?Matan Drory
02/03/2022, 5:35 PMprefect[aws]==0.15.3
. We have a flow that generates a large number of batch jobs, then submit them, splits them to chunks of 100 and generates an AWS waiter object per 100.
We have one very large jobs with over 450 batch calls. This job seem to be stuck even though it progressed until the end.
When tracking the progress we can see that a few tasks are stuck as mapped even though all child tasks are done. i.e. 472 successful definitions created in create_ive_analysis
then all of them were submitted and chunked in submit_jobs_analysis
and then all 5 AWSClientWait
jobs were done (we call it with map on the chunked job ids). The parent block is still in mapped mode.
Also, sometimes an AWSClientWait
tasks fails and the job doesn’t fail, it just stays there (again this is with a mapped AWSClientWait
)
Wait code
wait_res = AWSClientWait(client='batch', waiter_name='JobComplete').map(waiter_kwargs=batched_waits)
Where batched_waits is created by
@task
def submit_jobs_and_batch_job_awaits(jobs: List, batched_num=BATCH_CLIENT_WAIT_MAX_SIZE):
submitted_jobs = [BatchSubmit().run(**job) for job in jobs]
waits = []
for i in range(0, len(submitted_jobs), batched_num):
waits.append(
{
'jobs': submitted_jobs[i : i + batched_num],
'WaiterConfig': {
'Delay': 10,
'MaxAttempts': 10000,
},
}
)
return
What could cause that?Carlos Paiva
02/03/2022, 6:04 PMPayam K
02/03/2022, 9:37 PM- task: AWSShellScript@1
inputs:
# awsCredentials: 'xxxxx'
regionName: 'xxx'
scriptType: 'inline'
inlineScript: python3 cli.py -remote S3 -p "['a','2020-09-01', '2020-09-02']"
I get this error:
[2022-02-03 21:29:07+0000] ERROR - prefect.S3Result | Unexpected error while reading from S3: TypeError('expected string or bytes-like object')
Traceback (most recent call last):
File "/miniconda3/lib/python3.7/site-packages/prefect/engine/results/s3_result.py", line 142, in exists
self.client.get_object(Bucket=self.bucket, Key=location.format(**kwargs))
File "/miniconda3/lib/python3.7/site-packages/botocore/client.py", line 391, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/miniconda3/lib/python3.7/site-packages/botocore/client.py", line 692, in _make_api_call
api_params, operation_model, context=request_context)
File "/miniconda3/lib/python3.7/site-packages/botocore/client.py", line 738, in _convert_to_request_dict
api_params, operation_model, context)
File "/miniconda3/lib/python3.7/site-packages/botocore/client.py", line 770, in _emit_api_params
params=api_params, model=operation_model, context=context)
File "/miniconda3/lib/python3.7/site-packages/botocore/hooks.py", line 357, in emit
return self._emitter.emit(aliased_event_name, **kwargs)
File "/miniconda3/lib/python3.7/site-packages/botocore/hooks.py", line 228, in emit
return self._emit(event_name, kwargs)
File "/miniconda3/lib/python3.7/site-packages/botocore/hooks.py", line 211, in _emit
response = handler(**kwargs)
File "/miniconda3/lib/python3.7/site-packages/botocore/handlers.py", line 238, in validate_bucket_name
if not VALID_BUCKET.search(bucket) and not VALID_S3_ARN.search(bucket):
TypeError: expected string or bytes-like object
the same task runs good when I just run it on an Azure Devops agent:
- task: AWSShellScript@1
inputs:
# awsCredentials: 'xxxxx'
regionName: 'xxx'
scriptType: 'inline'
inlineScript: python3 cli.py -local S3 -p "['a','2020-09-01', '2020-09-02']"
has anyone had this issue before?Moises Vera
02/03/2022, 11:20 PMFileNotFoundError: [Errno 2] No such file or directory: 'data/file.csv'
◦ this in prefect cloud
◦ mine is basic config. local agent in a server
◦ my code in a directory where I want to create the file with a directory
◦ the line of code is a pathlib.mkdir
sentence
I'm sure there is something I dont understand about prefect, because I cannot manipulate the local filesystem in the server with prefect cloud, any ideas?Farid
02/04/2022, 12:45 AMSTORAGE
for the flows and have the Gitlab CI/CD register the flows to the Prefect Cloud.
However, I receive an error when I try to run the dummy flow taken from the official docs :
from prefect import task, Flow
from prefect.storage import Gitlab
@task(log_stdout=True)
def hello_task():
text = f"hello World"
print(text)
return text
with Flow(
"hello-flow",
storage=Gitlab(
repo="predicthq/data-engineering/prefect-server",
path="flows/hello-world/flow_hello_flow.py",
access_token_secret="",
),
) as flow:
hello_task()
The error:
(.venv) ➜ prefect-server git:(main) ✗ python flows/hello-world/flow_hello_flow.py
Traceback (most recent call last):
File "/Users/farid/WorkSpace/Github_Repos/dataobs-poc/prefect-server/flows/hello-world/flow_hello_flow.py", line 2, in <module>
from prefect.storage import Gitlab
ImportError: cannot import name 'Gitlab' from 'prefect.storage' (/Users/farid/WorkSpace/Github_Repos/dataobs-poc/prefect-server/.venv/lib/python3.9/site-packages/prefect/storage/__init__.py)
Looking at the source code, I don’t see anything that could cause a dependency circle. Any idea what it could be causing this?
Needles to say that I have installed the gitlab package: pip install 'prefect[gitlab]'
davzucky
02/04/2022, 2:47 AMChristopher
02/04/2022, 7:40 AMThomas Fredriksen
02/04/2022, 12:06 PM{DOMAIN}/{PROJECT_NAME}
(or longer) or similar? For example iot_devices/site-temperature-management
Amanda Wee
02/04/2022, 12:23 PMREADME.md
is broken: it links to /core/
, which means accessing it from Github causes it to link to <https://github.com/PrefectHQ/prefect/blob/master/core|https://github.com/PrefectHQ/prefect/blob/master/core>
instead of <https://docs.prefect.io/core/|https://docs.prefect.io/core/>
I'd suggest linking directly to <https://docs.prefect.io/|https://docs.prefect.io/>
as per the previous behaviour. @Anna Geller git blame says you made the change 😛Amar Eid
02/04/2022, 12:40 PMKonstantin
02/04/2022, 1:26 PMPatrick Alves
02/04/2022, 2:03 PM@task(trigger=any_failed)
def alert_email(email, message):
sendmail(email, '[Prefect] ADCheck - [FAILED]', message)
with Flow("AD Check") as flow:
# Parameters
admin_email = Parameter("admin_email", default="<mailto:patrick@xyz.br|patrick@xyz.br>")
# Tasks
message = "some message"
try:
ad_conn = connect_active_directory(ad_server, ad_user, ad_pass)
except Exception as e:
message = f'Erro ao executar workflow: {e}'
# If all tasks are successful
notify_email(admin_email, message)
# If any task fails
alert_email(admin_email, message)
When the task fails, I would like to get the Exception error and save it o message variable, but I am getting Trigger was "all_successful" but some of the upstream tasks failed
@Anna Geller, any tips?
I've tried to use the state_handlers, but I need to be able to change parameters of the alert function for each workflow (email, subject, etc.)
So triggers seems better. But I could not get the error.Andrea Nerla
02/04/2022, 3:08 PMFile "C:\Users\andrea.nerla\AppData\Local\Programs\Python\Python39\lib\site-packages\box\box.py", line 516, in __getattr__
value = object.__getattribute__(self, item)
AttributeError: 'Config' object has no attribute 'datefmt'
Matthew Seligson
02/04/2022, 6:41 PMWilliam Grim
02/04/2022, 8:41 PM$ prefect run -n example --watch
Looking up flow metadata...Done
Creating run for flow 'example'...Done
└── Name: eccentric-groundhog
└── UUID: bc79f3ec-5525-4be6-b327-375624abb387
└── Labels: ['caretaker', 'input', 'output', 'prefect-agent-556bd57fdf-v74zj']
└── Parameters: {}
└── Context: {}
└── URL: <http://localhost:8080/default/flow-run/bc79f3ec-5525-4be6-b327-375624abb387>
Watching flow run execution...
└── 20:33:26 | INFO | Entered state <Scheduled>: Flow run scheduled.
── 20:33:43 | WARNING | It has been 15 seconds and your flow run has not been submitted by an agent. Agent 93e9ff4d-5fce-4b1d-ad1b-59925fd32f92 (agent) has matching labels and last queried a few seconds ago. It should deploy your flow run.
└── 20:34:16 | WARNING | It has been 50 seconds and your flow run has not been submitted by an agent. Agent 93e9ff4d-5fce-4b1d-ad1b-59925fd32f92 (agent) has matching labels and last queried a few seconds ago. It should deploy your flow run.
No agent is picking up any of our flows, and flow runs just stay in the "scheduled" state even though on a CLI run, it states that there is an agent with matching labels.William Grim
02/04/2022, 8:41 PM$ prefect run -n example --watch
Looking up flow metadata...Done
Creating run for flow 'example'...Done
└── Name: eccentric-groundhog
└── UUID: bc79f3ec-5525-4be6-b327-375624abb387
└── Labels: ['caretaker', 'input', 'output', 'prefect-agent-556bd57fdf-v74zj']
└── Parameters: {}
└── Context: {}
└── URL: <http://localhost:8080/default/flow-run/bc79f3ec-5525-4be6-b327-375624abb387>
Watching flow run execution...
└── 20:33:26 | INFO | Entered state <Scheduled>: Flow run scheduled.
── 20:33:43 | WARNING | It has been 15 seconds and your flow run has not been submitted by an agent. Agent 93e9ff4d-5fce-4b1d-ad1b-59925fd32f92 (agent) has matching labels and last queried a few seconds ago. It should deploy your flow run.
└── 20:34:16 | WARNING | It has been 50 seconds and your flow run has not been submitted by an agent. Agent 93e9ff4d-5fce-4b1d-ad1b-59925fd32f92 (agent) has matching labels and last queried a few seconds ago. It should deploy your flow run.
No agent is picking up any of our flows, and flow runs just stay in the "scheduled" state even though on a CLI run, it states that there is an agent with matching labels.Kevin Kho
02/04/2022, 8:44 PMAnna Geller
02/04/2022, 8:45 PMprefect run -n example --execute --watch
William Grim
02/04/2022, 8:46 PMexample
flow has an input
label, and the agents have that label and more.Anna Geller
02/04/2022, 8:48 PMWilliam Grim
02/04/2022, 8:49 PMAnna Geller
02/04/2022, 8:50 PMflow = Flow("local-flow", storage=Local(add_default_labels=False))
and on the agent:
prefect agent local start --no-hostname-label
William Grim
02/04/2022, 8:54 PMAnna Geller
02/04/2022, 8:56 PMWilliam Grim
02/04/2022, 8:56 PM── 21:01:25 | WARNING | It has been 50 seconds and your flow run has not been submitted by an agent. Found 5 healthy agents with matching labels. One of them should pick up your flow.
Anna Geller
02/04/2022, 9:04 PMWilliam Grim
02/04/2022, 9:08 PM/usr/local/bin/python /usr/local/bin/prefect agent local start -l company_name -l input -l output -l caretaker --show-flow-logs
with Flow(
"example",
state_handlers=[save_activity],
run_config=LocalRun(labels=["input", "output", "caretaker"]),
storage=storage.Local(add_default_labels=False),
) as flow:
Anna Geller
02/04/2022, 9:16 PMprefect agent local start -l company_name -l input -l output -l caretaker --show-flow-logs --no-hostname-label
William Grim
02/04/2022, 9:17 PMAnna Geller
02/04/2022, 9:18 PMWilliam Grim
02/04/2022, 9:18 PMAnna Geller
02/04/2022, 9:21 PMprefect agent local start -l company_name -l input -l output -l caretaker --show-flow-logs --no-hostname-label
flow:
with Flow(
"example",
state_handlers=[save_activity],
run_config=LocalRun(labels=["input", "output", "caretaker"]),
storage=storage.Local(add_default_labels=False),
) as flow:
and you need to make sure that you register this flow from the same machine as the agent, otherwise this won’t work because you specified that your storage is local to the agent atm.
If you run agents on a remote VM, you can explore other storage options like Git storage classes or cloud storage classesWilliam Grim
02/04/2022, 9:21 PMprefect run
say explicitly that an agent should pick up the flowAnna Geller
02/04/2022, 9:23 PMWilliam Grim
02/04/2022, 9:23 PMAnna Geller
02/04/2022, 9:24 PMWilliam Grim
02/04/2022, 9:24 PMAnna Geller
02/04/2022, 9:25 PMWilliam Grim
02/04/2022, 9:25 PMAnna Geller
02/04/2022, 9:28 PMWilliam Grim
02/04/2022, 9:29 PMAnna Geller
02/04/2022, 9:29 PMWilliam Grim
02/04/2022, 9:29 PMAnna Geller
02/04/2022, 9:31 PMWilliam Grim
02/04/2022, 9:32 PMAnna Geller
02/04/2022, 9:33 PMWilliam Grim
02/04/2022, 9:34 PMAnna Geller
02/04/2022, 9:34 PMprefect agent kubernetes install > agent.yaml # adjust yaml and apply
kubectl apply -f agent.yaml
William Grim
02/04/2022, 9:35 PMAnna Geller
02/04/2022, 9:35 PMWilliam Grim
02/04/2022, 9:35 PMAnna Geller
02/05/2022, 10:25 AM