Paulo Maia
11/15/2021, 5:39 PMGiovanni Giacco
11/15/2021, 6:28 PMBrad
11/15/2021, 8:34 PMsqlalchemy.exc.OperationalError: (sqlite3.OperationalError) unable to open database file
when trying to run orion start / reset db. We finally worked out it was because he didn’t have a ~/.prefect
folder, but this was a pretty cryptic error message to receiveFrank Oplinger
11/15/2021, 9:39 PM@task
def generate_set_sublists(inspection_id):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("An info message.")
logger.warning("A warning message.")
But, despite the task completing successfully, I can’t see those logs in the prefect cloud api. I do however see the ‘Starting task run…’ and ‘Finished task run for task with final state: Success’ logs that seem like the default. I am currently running my flow using the ECS Agent using Fargate. Thanks in advance.Marwan Sarieddine
11/15/2021, 10:27 PMkevin
11/15/2021, 10:38 PMFailed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'State payload is too large.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 91, in call_runner_target_handlers
state = self.client.set_task_run_state(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 1518, in set_task_run_state
result = self.graphql(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 319, in graphql
raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'State payload is too large.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
It appears to me that I'm managing a JSON payload that Prefect considers "too large" however from what I see, the payload I'm sending is only on the scale of ~100KB which to me intuitively isn't that large. Is there some limitation in Prefect that I'm accidentally breaching or is there something else I should be investigating in my code that could be causing thisMatt Alhonte
11/16/2021, 1:14 AMStartFlowRun
and one of the arguments to it comes from a parameter, and the Task Schematic doesn't even show it exists. ie,
signal = Parameter(
"signal", default="1a,1b,2a,2b,1c,1d,2d,1e,2e,1f,2f,1z,3a,3b"
)
nba1a_results = StartSkippableRun(
signal=signal,
nb_id="1a",
flow_name="TestNBA1a",
project_name=project_name,
wait=True,
parameters=task_parameters,
)
signal
doesn't seem to register.John T
11/16/2021, 2:46 AMPrefectResult
requires the result to be JSON serializable. After digging into where PrefectResult
was being used, we realized that we could change the serializer from JSONSerializer()
to PickleSerializer()
and have our pydantic objects move around seamlessly. We are however curious if changing the PrefectResult serializer to a pickling one can introduce some unforeseen consequences.
Thanks in advance for any clarification!Sergey
11/16/2021, 8:20 AMChris Arderne
11/16/2021, 10:32 AMprefect run -p …
) or registered/from Cloud? Context in this issue.Huw Ringer
11/16/2021, 10:55 AMprefect.context['my_run_parameter']
in my Python code results in the following runtime error:
Failed to load and execute Flow's environment: KeyError('my_run_parameter',)
I’ve tried lots of variations on this syntax e.g.
prefect.context.get('my_run_parameter')
but nothing seems to work. Any advice/assistance you can provide would be gratefully received, thanks!Amanda Wee
11/16/2021, 11:02 AMJohn Shearer
11/16/2021, 11:35 AMDekel R
11/16/2021, 2:32 PMinit_logger()
log = logging.getLogger(LOGGER_NAME)
def data_flow():
<http://log.info|log.info>('Running the data api workflow')
with Flow("data_to_parquet_flow", storage=Docker(registry_url="us-central1/something/xx",
dockerfile="./Dockerfile", stored_as_script=True,
path='path_to_the_current_file.py')) as flow:
json_bucket_prefix = fetch_data(start_date=start_date, end_date=end_date, token=token)
parse_jsons_to_parquet(files_prefix=json_bucket_prefix)
return flow
def run_local():
flow = data_flow()
flow.run()
def run_remotely():
flow = data_flow()
client = prefect.Client(api_key=prefect_token)
client.register(flow, project_name=PREFECT_PROJECT_NAME)
if __name__ == '__main__':
try:
start_date, end_date, token, prefect_token = get_workflow_env_vars()
if prefect_token:
run_remotely()
else:
run_local()
except Exception as e:
log.error(e, exc_info=True)
Now i’m running the script and most of the build goes as planned but then I get this error (after “init_logger()” runs as expected):
Traceback (most recent call last):
File "/opt/prefect/healthcheck.py", line 150, in <module>
flows = import_flow_from_script_check(flow_file_paths)
File "/opt/prefect/healthcheck.py", line 63, in import_flow_from_script_check
flows.append(extract_flow_from_file(file_path=flow_file_path))
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/storage.py", line 104, in extract_flow_from_file
raise ValueError("No flows found in file.")
ValueError: No flows found in file.
This is my Dockerfile:
FROM python:3.8-slim
WORKDIR /app
COPY ./requirements.txt /app/requirements.txt
RUN pip install -r requirements.txt
ENV PYTHONPATH /app/
COPY ./ /app/
Which is in the same hierarchy as the file I posted at the top.
Anyone had this issue? Any help will be appreciated.
Thanks!Jean-Baptiste Six
11/16/2021, 4:31 PM@task
async def mytask1():
await do_smth()
@task
async def mytask2():
await do_smth()
async def my_flow():
with Flow("Test") as flow:
await mytask1()
mytask2()
flow.run()
The goal is to be able to execute async functions, my flows will be synchronous between themDimas Gonzales
11/16/2021, 4:47 PMHugo Shi
11/16/2021, 5:50 PMHugo Kitano
11/16/2021, 5:56 PMJohn Muehlhausen
11/16/2021, 5:58 PMConstantino Schillebeeckx
11/16/2021, 6:51 PMStartFlowRun
legacy code? This seems to suggest it is - however it's used through the current docs. Wondering if its getting phased out.Kendal Burkhart
11/16/2021, 7:33 PMwith flow("api") as flow:
client = get_client(api_key, api_secret)
main = get_data(client)
mapped = get_mapped_data.map(prefect.unmapped(client), main)
The issue is that sometimes the flow can run for more than an hour,
at which time the access token for the API expires. (and 1 hour is max
that issuer allows) I can catch the error in a mapped task and try doing a token refresh.
But I'm unsure whether the new token will be available to mapped tasks, or
if each mapped task will end up requesting it's own new token, or something
else different entirely.
I guess I'm asking what the best way is to address this problem.
Thanks!Constantino Schillebeeckx
11/16/2021, 7:49 PMgenesys__fof
(notice the double underscore) to be listed first. Am I missing something?Matt Alhonte
11/16/2021, 8:12 PMbral
11/16/2021, 8:15 PMclient.create_flow_run(
flow_id="d7bfb996-b8fe-4055-8d43-2c9f82a1e3c7",
flow_run_name="docs example hello-world"
)
But when run triggered by prefect the names are random. Or you can set manualy from UI flow_run_name.
My tipical prefect script looks like :
@task
def task1():
pass
@task
def task1():
pass
with Flow() as flow:
task1()
task2()
...
client.register(flow, project_name='project')
Is there way set run_flow_name by default in code?
For example:
name = f"name_{datetime.datetime.utc()}"
Aqib Fayyaz
11/16/2021, 8:19 PMAqib Fayyaz
11/16/2021, 8:25 PMMax Kolasinski
11/16/2021, 9:07 PMVamsi Reddy
11/16/2021, 9:26 PMbucketname/dev
and bucketname/prod
and have our dev and prod flows stored accordingly. i am using storage=S3(bucket="bucketname")
but would like to have something like storage=S3(bucket="bucketname/dev")
. Any idea how to do this?Alexander Chen
11/16/2021, 9:42 PMCarlo
11/16/2021, 9:46 PM