William Wagner
05/09/2022, 4:54 PM___main___
), is there an in-process Orion orchestrator running alongside it?
2. If 1), is the flow meta-data persisted in a local SQLite instance on disk?
3. Is this "ephemeral" server-less workflow intended for purely testing purposes, or is it production viable?Sumant Agnihotri
05/09/2022, 5:31 PMa()
b()
c()
each of which waits for 2 secs and prints the current time.
Next, I created the following flows:
with Flow("flow-a") as flow_a:
a()
b()
with Flow("parent-flow") as flow:
c()
flow_a.run(executor=LocalDaskExecutor())
flow.run(executor=LocalDaskExecutor())
Here, to my surprise, task a
and b
run first, then after 2 seconds task c
runs.
Example o/p:
b: 23:04:32
a: 23:04:32
c: 23:04:34
I want all 3 to run parallelly, what am I doing wrong? (Sorry, if this is not the right forum to ask these questions.)Jan Domanski
05/09/2022, 6:37 PMChristian Nuss
05/09/2022, 6:52 PMMichelle Brochmann
05/09/2022, 7:45 PMS3Download
, S3List
, and S3Upload
tasks (available in 1.2 from the prefect.tasks.aws.s3
module) available?Jason
05/09/2022, 7:49 PMAlvaro Durán Tovar
05/09/2022, 8:49 PMMike Vanbuskirk
05/09/2022, 9:28 PMDylan
05/09/2022, 9:32 PMMatt O'Brien
05/09/2022, 10:27 PMMichelle Brochmann
05/09/2022, 10:42 PM.fn
returns a coroutine?
I tried this:
from prefect_aws.s3 import s3_upload
...
with prefect_test_harness():
my_upload = s3_upload.fn(bucket=S3_BUCKET_NAME, key='B5_key', data=b'55555', aws_credentials = AwsCredentials())
asyncio.run(my_upload)
But it’s not working with this runtime error:
E RuntimeError: There is no active flow or task run context.
../valo-prefect-poc/.venv/lib/python3.7/site-packages/prefect/logging/loggers.py:91: RuntimeError
kushagra kumar
05/10/2022, 7:14 AMpip install -U "prefect==2.0b1"
and I am encountering below error my machine Ubuntu 20.04.4 LTS
.kushagra kumar
05/10/2022, 7:21 AMPraveen Chaudhary
05/10/2022, 7:39 AMkushagra kumar
05/10/2022, 8:01 AMprefect 2.0
. Facing below error:
Traceback (most recent call last):
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/prefect/engine.py", line 467, in orchestrate_flow_run
result = await run_sync_in_worker_thread(flow_call)
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 52, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(context.run, call, cancellable=True)
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/anyio/to_thread.py", line 28, in run_sync
return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 818, in run_sync_in_worker_thread
return await future
File "/home/kku/work/prefect_poc/env/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 754, in run
result = context.run(func, *args)
File "car_linearregression.py", line 104, in do_regression
X,y = get_feat_and_target(df_car,target)
TypeError: cannot unpack non-iterable PrefectFuture object
It's a simple serial execution where a flow
function calls different Task
functions serially. very similar to the below tutorial on the official website.
import requests
from prefect import flow, task
@task
def call_api(url):
response = requests.get(url)
print(response.status_code)
return response.json()
@task
def parse_fact(response):
print(response["fact"])
return
@flow
def api_flow(url):
fact_json = call_api(url)
parse_fact(fact_json)
return
So far I have tried creating a new virtual env and install minimal packages required to run the ML model but had no luck. Could you please help me with this.Jan Domanski
05/10/2022, 8:25 AMFile "/venv/lib/python3.8/site-packages/prefect/orion/orchestration/rules.py", line 534, in __aexit__
await self.after_transition(*exit_context)
File "/venv/lib/python3.8/site-packages/prefect/orion/database/dependencies.py", line 112, in async_wrapper
return await fn(*args, **kwargs)
File "/venv/lib/python3.8/site-packages/prefect/orion/orchestration/core_policy.py", line 190, in after_transition
cache_key = validated_state.state_details.cache_key
AttributeError: 'NoneType' object has no attribute 'state_details'
Any idea how to debug/interpret this?Ben Muller
05/10/2022, 9:36 AMNacho Rodriguez
05/10/2022, 9:55 AMElio
05/10/2022, 10:06 AMBhupesh Kemar Singh
05/10/2022, 10:35 AMDanilo Drobac
05/10/2022, 10:39 AMModuleNotFoundError: No module named 'markupsafe'
Arthur Jacquemart
05/10/2022, 11:17 AMTony
05/10/2022, 1:36 PMflow.storage
and flow.run_config
) and registration flows for my enterprise. Recently we wanted to duplicate all Prefect Cloud UI logging to Cloudwatch.
Inside an individual flow I can add this code to get the logs there, but I was wondering if there was a way I could do this through a central utility?
with Flow("My First Flow") as flow:
logger = context.get("logger")
logger.addHandler(
watchtower.CloudWatchLogHandler(
log_group_name="prefect-logs",
)
)
Aka, would something like this work?
from prefect.utilities.storage import extract_flow_from_file
flow = extract_flow_from_file("path")
flow.logger.addHandler()?
. . .
flow.register()
Jan Domanski
05/10/2022, 2:36 PMJason
05/10/2022, 2:37 PMBob Colner
05/10/2022, 2:39 PMprefect-gcp
authentication issue/question. I’m trying to follow the example docs, but getting an error Importing GcpCredentials
: NameError: name 'SecretManagerServiceClient' is not defined
. FIY the prefect1.0 GCP/bigquery tasks are working fine in my environment. Any advise?Benny Warlick
05/10/2022, 3:59 PMprefect cloud login --key <MY_KEY> -w <MY_WORKSPACE>
rand="GCS_"$(cat /dev/urandom | tr -cd 'a-f0-9' | head -c 16)
output=$(printf '%s\n' 2 <MY_BUCKET> <MY_PROJECT> $rand | prefect storage create)
storage_id=$(echo $output | grep -oP "(?<=identifier \').+?(?=\')")
prefect storage set-default $storage_id
prefect deployment create my_flow.py
output=$(prefect work-queue create my_queue | grep -oP "(?<=UUID\(\').+?(?=\'\))")
prefect agent start $output
Bob Colner
05/10/2022, 4:14 PMprefect-gcp
issue using the bigquery_insert_stream
task. I’m not able to pass Timestamp
data-types -getting: TypeError: Object of type Timestamp is not JSON serializable
Josephine Douglas
05/10/2022, 5:42 PMcreate_flow_run
and wait_for_flow_run
(see previous thread). The child flow takes a few hours to run, and in the meantime, the parent flow decides that it must have failed and reports that the whole parent flow failed. Is there a way to extend the timeout period for wait_for_flow_run
?Billy McMonagle
05/10/2022, 5:44 PMBilly McMonagle
05/10/2022, 5:44 PM[2022-05-10 17:38:40+0000] INFO - prefect.S3 | Uploading script /codebuild/output/src317307747/src/schedule.py to finance/2022-05-10t17-38-40-057549-00-00 in <bucket>
Traceback (most recent call last):
File "/codebuild/output/src317307747/src/datascience/register.py", line 110, in <module>
register_flow(flow)
File "/codebuild/output/src317307747/src/datascience/register.py", line 93, in register_flow
flow.register(
File "/root/.pyenv/versions/3.9.5/lib/python3.9/site-packages/prefect/core/flow.py", line 1727, in register
registered_flow = client.register(
File "/root/.pyenv/versions/3.9.5/lib/python3.9/site-packages/prefect/client/client.py", line 1244, in register
self.graphql(
File "/root/.pyenv/versions/3.9.5/lib/python3.9/site-packages/prefect/client/client.py", line 570, in graphql
raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'path': ['set_schedule_active'], 'message': 'unable to perform operation on <TCPTransport closed=True reading=False 0x555efa0e7420>; the handler is closed', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
Kevin Kho
05/10/2022, 5:54 PMBilly McMonagle
05/10/2022, 5:55 PM[2022-05-10 18:13:13+0000] INFO - prefect.S3 | Uploading script /codebuild/output/src944635918/src/schedule.py to path/2022-05-10t18-13-13-230406-00-00 in <bucket>
Traceback (most recent call last):
File "/codebuild/output/src944635918/src/datascience/register.py", line 110, in <module>
register_flow(flow)
File "/codebuild/output/src944635918/src/datascience/register.py", line 93, in register_flow
flow.register(
File "/root/.pyenv/versions/3.9.5/lib/python3.9/site-packages/prefect/core/flow.py", line 1727, in register
registered_flow = client.register(
File "/root/.pyenv/versions/3.9.5/lib/python3.9/site-packages/prefect/client/client.py", line 1176, in register
res = self.graphql(
File "/root/.pyenv/versions/3.9.5/lib/python3.9/site-packages/prefect/client/client.py", line 570, in graphql
raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'path': ['create_flow_from_compressed_string'], 'message': 'Unable to complete operation', 'extensions': {'code': 'API_ERROR'}}]
Andrew Lawlor
05/10/2022, 6:17 PMKevin Kho
05/10/2022, 6:18 PMBilly McMonagle
05/10/2022, 6:19 PMKevin Kho
05/10/2022, 6:21 PMBilly McMonagle
05/10/2022, 6:25 PMAnna Geller
05/10/2022, 6:28 PMBilly McMonagle
05/10/2022, 6:32 PMAndrew Lawlor
05/10/2022, 6:36 PMBilly McMonagle
05/10/2022, 6:37 PMBen Ayers-Glassey
05/10/2022, 6:38 PMraise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'path': ['create_flow_from_compressed_string'], 'message': 'Unable to complete operation. An internal API error occurred.', 'extensions': {'code': 'API_ERROR'}}]
...it's now fixed in the sense that I can run flows, however I don't seem to be able to see their logs.prefect run
with `--watch`:
Looking up flow metadata... Done
Creating run for flow 'bag_wildfire_perimeters_log'... Done
└── Name: axiomatic-kestrel
└── UUID: ca7b3443-26e8-4864-a01f-9a8c4e23cbd0
└── Labels: []
└── Parameters: {'shapefile_url': '<http://opendata.arcgis.com/api/v3/datasets/6bd5dd4e93154f6fae0de1f2c82d95bf_0/downloads/data?format=shp&spatialRefId=4326>'}
└── Context: {}
└── URL: <https://cloud.prefect.io/zesty-ai/flow-run/ca7b3443-26e8-4864-a01f-9a8c4e23cbd0>
Watching flow run execution...
└── 11:32:35 | INFO | Entered state <Scheduled>: Flow run scheduled.
└── 11:32:39 | INFO | Entered state <Submitted>: Submitted for execution
└── 11:33:21 | INFO | Entered state <Running>: Running flow.
└── 11:34:31 | INFO | Entered state <Success>: All reference tasks succeeded.
Flow run succeeded!
...none of the logging my tasks do is showing up, nor is it even logging which tasks are running.No logs found. Try expanding or resetting your search.
Click here to retrieve archived logs - please allow up to 30 seconds for the retrieval to take effect....if I click there, it says "Retrieving archived logs from glacial storage......" for a while. ...and then eventually just says:
Sorry, we couldn't get the logs from glacial storage... Retry?
Andrew Lawlor
05/10/2022, 7:05 PMAnna Geller
05/11/2022, 1:35 PMBen Ayers-Glassey
05/11/2022, 3:15 PM