Tim Galvin
11/07/2022, 9:12 AM--storage-block
is unset to copy.
I thought I could set up a 'dummy' local file system block in the Orion UI (in my own managed server, not prefect cloud), however the prefect deployment build
command says 'github', 's3', 'gcs', 'azure', 'smb'
are supported types.
TL;DR - I need to set a --storage-block
in my deployment, and I am reasonably certain in my situation I do not want to be copying anything to / from different file systems and blocks. I have a common underlying filesystem at the HPC center, and my data are pretty large -- large enough where I can not reasonably expect copying to and from the disk to be feasibleSunjay
11/07/2022, 10:19 AMRabea Yousof
11/07/2022, 11:20 AMThom
11/07/2022, 11:34 AMStéphan Taljaard
11/07/2022, 1:12 PMFlow(...).validate()
, right?vholmer
11/07/2022, 1:18 PMMark Li
11/07/2022, 1:58 PMBlake Stefansen
11/07/2022, 2:06 PMRUNNING
Engine execution of flow run '148d81ea-3cfe-4db1-a0d4-3f3f17748fb0' aborted by orchestrator: This run cannot transition to the RUNNING state from the RUNNING state.
Will add more details to threadMohamed Alaa
11/07/2022, 2:22 PMJV
11/07/2022, 3:15 PM"errorMessage": "HTTP Error 404: Not Found"
I am passing API URL <https://api.prefect.io>
. I understand that this documentation is not latest and I also tried URL <https://api.prefect.cloud/>
and getting same error. Request your inputs regarding this error in Prefect version 2.0Bradley Hurley
11/07/2022, 4:23 PMFancy Arora
11/07/2022, 5:42 PMJeff Hale
11/07/2022, 5:54 PMLukasz Mentel
11/07/2022, 6:44 PMio.StringIO
objects between tasks in prefect 2.0? Here's a minimal example I'm trying to get to work but getting weird results, appreciate any help on this.
import io
from prefect import task, flow
@task
def string_to_io():
return io.StringIO("hello prefect")
@task
def consume(string_io):
print(type(string_io))
return string_io.read()
@flow
def test():
# s = string()
str_io = string_to_io()
r = consume(str_io)
print(r)
test()
when I run it I get
9:38:59.511 | INFO | prefect.engine - Created flow run 'congenial-longhorn' for flow 'test'
19:38:59.708 | INFO | Flow run 'congenial-longhorn' - Created task run 'string_to_io-78a25967-0' for task 'string_to_io'
19:38:59.709 | INFO | Flow run 'congenial-longhorn' - Executing 'string_to_io-78a25967-0' immediately...
19:38:59.768 | INFO | Task run 'string_to_io-78a25967-0' - Finished in state Completed()
19:38:59.789 | INFO | Flow run 'congenial-longhorn' - Created task run 'consume-884672dc-0' for task 'consume'
19:38:59.790 | INFO | Flow run 'congenial-longhorn' - Executing 'consume-884672dc-0' immediately...
<class 'list'>
19:38:59.828 | ERROR | Task run 'consume-884672dc-0' - Encountered exception during execution:
Traceback (most recent call last):
File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 1222, in orchestrate_task_run
result = await run_sync(task.fn, *args, **kwargs)
File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/home/lukasz/projects/zlotyryjek/crap.py", line 18, in consume
return string_io.read()
AttributeError: 'list' object has no attribute 'read'
So for some reason stringIO
is converted to an empty list 🤔Kalise Richmond
11/07/2022, 7:07 PMNick Coy
11/07/2022, 9:12 PMprefect.agent - An error occured while monitoring flow run <flow_run_id> The flow run will not be marked as failed, but an issue may have occurred.
Philip MacMenamin
11/07/2022, 9:28 PMwith Flow(
"...",
state_handlers=[utils.notify_completion, utils.notify_running],...
with
def notify_running(flow: Flow, old_state, new_state) -> State:
And I could ask what the new state is doing. eg if
if new_state.is_running()
I'm reading https://docs.prefect.io/concepts/states/to try to work out how to capture these flow level state changes and am struggling.
Any pointers?Tim Ricablanca
11/07/2022, 10:09 PMalembic.util.exc.CommandError: Can't locate revision identified by '41e5ed9e1034'
i’ll post some details in a thread about what i tried before nuking the database and starting over, but 1) is there guidance on how i should be using prefect orion database stamp
or … revision
before upgrading orion from 2.6.4 -> 2.6.6 (or any other minor version) ? and 2) should i be running … downgrade
from the 2.6.6 package or 2.6.4 package if i wanted to go from 2.6.6->2.6.4?Zac Hooper
11/07/2022, 11:53 PMCrash detected! Execution was interrupted by an unexpected exception.
Looking over the logs produced by the agent I can't see the Exception thrown.
The flow looks like below. I've changed the variables used but the premise is that two lambdas are invoked but the 2nd needs to wait until the first is completed.
import os
from prefect import flow, get_run_logger, task
from prefect.deployments import Deployment
from prefect.filesystems import S3
from prefect.orion.schemas.schedules import CronSchedule
from utils.aws import invoke_lambda
# --- Flow Details --- #
client = "example_client"
project = "example_project"
flow_name = "Example Name"
description = """
Example Description
"""
tags = ["example"] # Optional list of string tags for the flow. Helps to filter in UI
schedule = CronSchedule(cron="* 7-19 * * MON-FRI", timezone="Australia/Sydney")
# --- Other Flow Details (Leave as is) --- #
work_queue = "example_workqueue"
storage = S3.load("s3-storage")
output_file_name = f"{os.path.splitext(os.path.basename(__file__))[0]}.yaml"
def invoke_lambda(function_full_name, payload, blocking=False, **kwargs):
"""
Invokes the given lambda function in AWS.
Args:
function_full_name (str): Exact name of the lambda function
payload (dict): Dictionary of data you may want to pass to lambda
blocking (bool, optional): If you want to wait for a response from the lambda. Defaults to False.
"""
config = Config(max_pool_connections=250, read_timeout=900, connect_timeout=900)
lambda_client = boto3.client("lambda", config=config)
# Change client's region
if kwargs.get("region"):
lambda_client = boto3.client(
"lambda", config=config, region_name=kwargs.get("region")
)
# DO NOT BLOCK IF WILL RUN OVER 30 SECONDS
payload = bytes(json.dumps(payload), encoding="utf8")
if blocking:
response = lambda_client.invoke(
FunctionName=function_full_name,
InvocationType="RequestResponse",
LogType="Tail",
Payload=payload,
)
res_payload = response.get("Payload").read()
return json.loads(res_payload)
else:
response = lambda_client.invoke(
FunctionName=function_full_name,
InvocationType="Event",
LogType="None",
Payload=payload,
)
return response
@task
def first_lambda_to_invoke():
logger = get_run_logger()
res = invoke_lambda("first_lambda_to_invoke", {}, True)
<http://logger.info|logger.info>(res)
return ""
@task
def second_lambda_to_invoke():
logger = get_run_logger()
res = invoke_lambda("second_lambda_to_invoke", {}, True)
<http://logger.info|logger.info>(res)
return ""
@flow
def example_flow():
x = first_lambda_to_invoke.submit()
y = second_lambda_to_invoke.submit(wait_for=[x])
if __name__ == "__main__":
Deployment.build_from_flow(
flow=example_flow,
name=flow_name,
work_queue_name=work_queue,
tags=[client, project] + tags,
schedule=schedule,
description=description,
output=output_file_name,
storage=storage,
apply=True,
path="/",
)
Here is an example of the log output from a failed run
Created task run 'first_lambda_to_invoke-ec90046a-0' for task 'first_lambda_to_invoke'
09:32:10 AM
Submitted task run 'first_lambda_to_invoke-ec90046a-0' for execution.
09:32:10 AM
Created task run 'second_lambda_to_invoke-a66b51c9-0' for task 'second_lambda_to_invoke'
09:32:10 AM
Submitted task run 'second_lambda_to_invoke-a66b51c9-0' for execution.
09:32:10 AM
Created task run 'first_lambda_to_invoke-ec90046a-0' for task 'first_lambda_to_invoke'
09:32:10 AM
Submitted task run 'first_lambda_to_invoke-ec90046a-0' for execution.
09:32:10 AM
Created task run 'second_lambda_to_invoke-a66b51c9-0' for task 'second_lambda_to_invoke'
09:32:10 AM
Submitted task run 'second_lambda_to_invoke-a66b51c9-0' for execution.
09:32:10 AM
Crash detected! Execution was interrupted by an unexpected exception.
09:32:10 AM
first_lambda_to_invoke-ec90046a-0
{'statusCode': 200}
09:32:21 AM
first_lambda_to_invoke-ec90046a-0
Crash detected! Execution was interrupted by an unexpected exception.
Note the {'statusCode': 200}
response from the Lambda showing that it ran successfully.Ben Muller
11/08/2022, 1:57 AM.set_dependencies(
upstream_tasks=[my_upstream_task]
)
Oh - I can see in the message above there is a wait_for
kwarg in the submit handler... Ignore me 👀Faheem Khan
11/08/2022, 4:09 AMBrokenPipeError: [Errno 32] Broken pipe
ConnectionResetError: [Errno 104] Connection reset by peer
. Any leads would be much appreciated, cheersEvgeny Ivanov
11/08/2022, 5:46 AMcache_key_fn=task_input_hash
and cache_expiration=timedelta(days=1)
. Now for some reason I want to clear cache and run the task again (suppose I've changed the logic in a module I'm using from the task).
My options (including non-working):
1. Deleting cache files doesn't work. If I run a task after deleting files I get an error. Prefect doesn't check if the file exists before deciding on using cache.
2. Deleting flow runs or task runs does work. But it's not convenient for two reasons:
a. I should delete history of runs.
b. I have to remember which flow/task run to delete or delete all of them.
3. Adding extra parameter cache_num
to task should work. I can just change it's value every time I want to avoid using cache. But it generates extra boilerplate code and I have to change flow/task code to change parameter value. It looks like a dirty duct tape to me)
4. Changing data in OrionDB maybe possible, but I'm sure that it's strongly not recommended. And I'd like to avoid it.
Maybe there is a better option I don't know about? A perfect solution would be CLI or API with flow/task name as an input parameter.Michiel Verburg
11/08/2022, 8:10 AMIkkyu Choi
11/08/2022, 8:13 AMis_schedule_active
means status of flow (turn on or turn off) ?
I got a some flow’s`is_schedule_active` bool value as true by using graphql, but on my prefect UI this flow is turned off state. Am i think wrong?Rohith
11/08/2022, 11:05 AMRohith
11/08/2022, 11:05 AMPiotr Bródka
11/08/2022, 11:17 AMEvgeny Ivanov
11/08/2022, 11:36 AMparam=value
, 'param': 'value'
or something else?
Anyway after editing this field I cannot create the Block. Chome gives me the following error.
UPD. I've found out that I have to fill Query and Connect Args either with some values or with null
. And the right format for Connect Args is JSON:
{
"param": "value"
}
P.S. Btw, I cannot create Block in Firefox. Is this browser not supported?Denis
11/08/2022, 1:08 PMflow_run_ids=[]
for param in params:
flow_run_id = create_flow_run(
flow=flow_model,
parameters=run_params,
name=run_name,
state=state
)
flow_run_ids.append(flow_run_id)
wait_for_flows(flow_run_ids)
Thanks for any helpSunjay
11/08/2022, 2:14 PMSunjay
11/08/2022, 2:14 PMPeyton Runyan
11/08/2022, 2:50 PMSunjay
11/08/2022, 4:07 PMJeff Hale
11/08/2022, 6:40 PMSunjay
11/15/2022, 12:08 PMif conditions == 'Foo":
a = task1()
else:
"set task1() state to success" #i.e skip execution of task1()
b= task2() # task 2 wait_for the completed state of task1
So this is what I want to achieve in essence. I just wanted to know how to set the task1 state to completed as it can be skipped in case it doesn't meet a certain condition but the downstream job depends on the success state of the task1 using wait_for parameter.Jeff Hale
11/15/2022, 1:43 PMdef flow1(x):
if x == "Foo":
a = task1()
else:
a = "didn't run task1"
b = task2()
if a != "didn't run task1" and b:
# do something
Sunjay
11/15/2022, 2:53 PM