Nikolaus Landgraf
07/06/2022, 2:12 PMsqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: json_each
[SQL: INSERT INTO flow_run_notification_queue (flow_run_notification_policy_id, flow_run_state_id) SELECT flow_run_notification_policy.id, CAST(? AS CHAR(36)) AS anon_1
FROM flow_run_notification_policy
WHERE flow_run_notification_policy.is_active IS 1 AND (flow_run_notification_policy.state_names = ? OR EXISTS (SELECT 1
FROM json_each(flow_run_notification_policy.state_names) AS json_each
WHERE json_each.value IN (?))) AND (flow_run_notification_policy.tags = ? OR EXISTS (SELECT 1
FROM json_each(flow_run_notification_policy.tags) AS json_each
WHERE json_each.value IN (SELECT 1 FROM (SELECT 1) WHERE 1!=1)))]
[parameters: ('8d01934d-c9c8-4f23-a3af-ec03ee1586a0', '[]', 'Pending', '[]')]
(Background on this error at: <https://sqlalche.me/e/14/e3q8>)
Muddassir Shaikh
07/06/2022, 2:15 PMgraphql_1 | ValueError: Uniqueness violation.
graphql_1 |
graphql_1 | The above exception was the direct cause of the following exception:
graphql_1 |
graphql_1 | Traceback (most recent call last):
graphql_1 | File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 674, in await_completed
graphql_1 | return await completed
graphql_1 | File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 659, in await_result
graphql_1 | return_type, field_nodes, info, path, await result
graphql_1 | File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 733, in complete_value
graphql_1 | raise result
graphql_1 | File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 628, in await_result
graphql_1 | return await result
graphql_1 | File "/prefect-server/src/prefect_server/graphql/extensions.py", line 52, in resolve
graphql_1 | result = await result
graphql_1 | File "/prefect-server/src/prefect_server/graphql/tenants.py", line 14, in resolve_create_tenant
graphql_1 | "id": await api.tenants.create_tenant(name=input["name"], slug=input["slug"])
graphql_1 | File "/prefect-server/src/prefect_server/api/tenants.py", line 29, in create_tenant
graphql_1 | tenant_id = await models.Tenant(name=name, slug=slug).insert()
graphql_1 | File "/prefect-server/src/prefect_server/database/orm.py", line 222, in insert
graphql_1 | insert_mutation_name=self.__root_fields__.get("insert"),
graphql_1 | File "/prefect-server/src/prefect_server/database/hasura.py", line 237, in insert
graphql_1 | result = await self.execute_mutations_in_transaction(mutations=[graphql])
graphql_1 | File "/prefect-server/src/prefect_server/database/hasura.py", line 165, in execute_mutations_in_transaction
graphql_1 | as_box=as_box,
graphql_1 | File "/prefect-server/src/prefect_server/database/hasura.py", line 89, in execute
graphql_1 | raise ValueError("Uniqueness violation.")
graphql_1 | graphql.error.graphql_error.GraphQLError: Uniqueness violation.
graphql_1 |
graphql_1 | GraphQL request:2:3
graphql_1 | 1 | mutation ($input: create_tenant_input!) {
graphql_1 | 2 | create_tenant(input: $input) {
graphql_1 | | ^
graphql_1 | 3 | id
Vadym Dytyniak
07/06/2022, 2:23 PMIbrahim Sherif
07/06/2022, 3:47 PMkevin
07/06/2022, 5:34 PM.csv
file from a google cloud bucket. It seems like this task is the correct way to go? An alternative I can see is just implementing a python callable that uses the python library for google cloud storage. I'd like some feedback if I'm approaching this problem correctly?Amogh Kulkarni
07/06/2022, 6:21 PMJosh Paulin
07/06/2022, 6:50 PMSuccess
state, and the results file has been overwritten. If I’m understanding correctly this should be completing with Cached
. Any suggestions?Jon Ruhnke
07/06/2022, 7:19 PMKyle McChesney
07/06/2022, 8:46 PM-l
(I want a login shell). Seems like currently its passing the command like so
overrides
containerOverrides
command": [
"/bin/sh",
"-c",
"prefect execute flow-run"
]
Also, possible or not, is this command stable / is it safe to customize this?Ryan Sattler
07/07/2022, 4:37 AMyu zeng
07/07/2022, 5:16 AMShivam Bhatia
07/07/2022, 6:34 AMAlvaro Durán Tovar
07/07/2022, 7:33 AMRunNamespacedJob
task. Because of a known bug the way I'm obtaining the logs are via ReadNamespacedPodLogs
. All that working well. Recently I changed the pod spec to run the application as an initContainer
then some docker image to capture results as the actual container
. So I have an image that generates content and logs in the ìnitContainer
and a container
that process the produced content (upload to gcs basically).
Problem: the logs from the initContainer are not being captured by prefect, only the logs from the container. Any recommendation? 🙏Rainer Schülke
07/07/2022, 8:04 AMweeks = Parameter("weeks", required=False, default=None)
Afterwards there is a case block where either the last actual weeks is getting pulled or set to the provided Parameter. Locally it's working like a charm but when I want to execute the flow within the cloud, I get an error for the week task:
The following error messages were provided by the GraphQL server: INTERNAL_SERVER_ERROR: Variable "$input" got invalid value null at "input.states[0].task_run_id"; Expected non-nullable type UUID! not to be null. The GraphQL query was: mutation($input: set_task_run_states_input!) { set_task_run_states(input: $input) { states { status message id } } } The passed variables were: {"input": {"states": [{"state": {"context": {"tags": []}, "cached_inputs": {}, "message": "Starting task run.", "_result": {"__version__": "0.14.10", "type": "NoResultType"}, "__version__": "0.14.10", "type": "Running"}, "task_run_id": null, "version": null}]}}
Task 'weeks': Finished task run for task with final state: 'ClientFailed'
Do you know what might be the problem here? Why is it stated that I got a null value? It should be None.Dennis Hinnenkamp
07/07/2022, 9:06 AMChristian Vogel
07/07/2022, 10:59 AM(begin_task_run pid=141324) ImportError: cannot import name 'SubprocessFlowRunner' from partially initialized module 'prefect.flow_runners' (most likely due to a circular import) (/home/christian/Documents/ray_and_prefect/env/lib/python3.9/site-packages/prefect/flow_runners/__init__.py)
I am using the following dependencies: prefect==2.0b7 prefect-ray==0.1.0 ray==1.13.0
Apparently I am doing something wrong with my dependencies or when I am importing them. Do you have any idea?Marcin Grzybowski
07/07/2022, 12:10 PMRobert Kowalski
07/07/2022, 1:06 PMmap
function fail? and works when map
was replace with bind
?
from prefect import task, Flow, unmapped
@task
def first():
return range(10)
@task
def second():
return True
@task
def third(numbers):
print(numbers)
with Flow(name='test') as flow:
numbers = first()
second_result = second()
# third.bind(numbers=numbers, upstream_tasks=[second_result])
third.map(numbers=unmapped(numbers), upstream_tasks=[second_result])
flow.run()
Kyle McChesney
07/07/2022, 2:38 PMKeith Veleba
07/07/2022, 4:29 PMUnexpected error: AttributeError("'S3Result' object has no attribute 'upload_options'")
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 930, in get_task_run_state
result = self.result.write(value, **formatting_kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/results/s3_result.py", line 89, in write
ExtraArgs=self.upload_options,
AttributeError: 'S3Result' object has no attribute 'upload_options'
Attached is one of the flows that are failing. Does the task running code record execution state back to the storage bucket?
Thanks in advance!Britt Evans
07/07/2022, 4:35 PMRajvir Jhawar
07/07/2022, 6:55 PMkubectl
configured to connect to a clusterXavier Witdouck
07/07/2022, 7:04 PMMatthew Seligson
07/07/2022, 7:18 PMMichal Baumgartner
07/07/2022, 7:29 PMprefect.storage.Module
) would be available in the future releases. We're running Prefect + Dask setup on k8s from a monorepo (i.e. agents and dask workers deployments run the same docker image & tag), where flows are sourced from Python modules (and registered before agents start up during a new release), therefore I'd prefer to not use any object storage for storing flows or results if possibleAdam
07/07/2022, 7:46 PMAlex Tam
07/07/2022, 7:54 PMredsquare
07/07/2022, 7:58 PMIfeanyi Okwuchi
07/07/2022, 8:00 PMUnexpected error: KeyError(0)
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 569, in get_flow_run_state
executors.prepare_upstream_states_for_mapping(
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 682, in prepare_upstream_states_for_mapping
value = upstream_state.result[i]
KeyError: 0
Paul Stark
07/07/2022, 10:08 PMsnowflake_query = SnowflakeQuery(
max_retries=cfg.retry_max,
retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
)
sql_queries = cfg.sql_query.split(';')
with Flow(
getenv('FLOW_NAME')
) as flow:
run_snowflake_query = snowflake_query(
account=PrefectSecret("SNOWFLAKE_ACCOUNT"),
user=PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}"),
password=PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}"),
warehouse=cfg.snowflake_warehouse,
).map(query=sql_queries)
sql_queries is getting put into a list since I can loop through it….Any thoughts on what I am doing incorrectly?Paul Stark
07/07/2022, 10:08 PMsnowflake_query = SnowflakeQuery(
max_retries=cfg.retry_max,
retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
)
sql_queries = cfg.sql_query.split(';')
with Flow(
getenv('FLOW_NAME')
) as flow:
run_snowflake_query = snowflake_query(
account=PrefectSecret("SNOWFLAKE_ACCOUNT"),
user=PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}"),
password=PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}"),
warehouse=cfg.snowflake_warehouse,
).map(query=sql_queries)
sql_queries is getting put into a list since I can loop through it….Any thoughts on what I am doing incorrectly?Kevin Kho
07/07/2022, 10:12 PMsnowflake_query = SnowflakeQuery(
max_retries=cfg.retry_max,
retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
)
is already an init and then
snowflake_query(
account=PrefectSecret("SNOWFLAKE_ACCOUNT"),
user=PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}"),
password=PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}"),
warehouse=cfg.snowflake_warehouse,
).map(query=sql_queries)
is 2 more.
I know the attempt here is to initialize with secrets but you can’t do that because init is called during registration and secrets are fed in during runtime. Instead, you pass the account, user, and password along with the map by using unmapped
Paul Stark
07/07/2022, 10:14 PMTask 'SnowflakeQuery': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/tasks.py", line 456, in method
return run_method(self, *args, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/prefect/tasks/snowflake/snowflake.py", line 152, in run
raise ValueError("A query string must be provided")
ValueError: A query string must be provided
Kevin Kho
07/07/2022, 10:15 PMPaul Stark
07/07/2022, 10:16 PMKevin Kho
07/07/2022, 10:19 PMsnowflake_query = SnowflakeQuery(
max_retries=cfg.retry_max,
retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
)
with Flow(
getenv('FLOW_NAME')
) as flow:
run_snowflake_query = snowflake_query(
query=query,
account=PrefectSecret("SNOWFLAKE_ACCOUNT"),
user=PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}"),
password=PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}"),
warehouse=cfg.snowflake_warehouse,
)
will work because it’s just called once after init, but the map is a call also so it should be:
with Flow(
getenv('FLOW_NAME')
) as flow:
run_snowflake_query = snowflake_query.map(
query=list_of_queries,
account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
warehouse=unmapped(cfg.snowflake_warehouse),
)
@task
def abc(x):
return x+1
with Flow(..) as flow:
abc().map([1,2,3])
The right syntax is:
with Flow(..) as flow:
abc.map([1,2,3)
Paul Stark
07/07/2022, 10:42 PMsnowflake_query = SnowflakeQuery(
max_retries=cfg.retry_max,
retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
)
sql_queries = cfg.sql_query.split(';')
with Flow(
getenv('FLOW_NAME')
) as flow:
run_snowflake_query = snowflake_query().map(
query=sql_queries,
account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
warehouse=cfg.snowflake_warehouse,
)
Error
Task 'SnowflakeQuery': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/tasks.py", line 456, in method
return run_method(self, *args, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/prefect/tasks/snowflake/snowflake.py", line 148, in run
raise ValueError("An account must be provided")
ValueError: An account must be provided
Kevin Kho
07/07/2022, 10:46 PMwith Flow(
getenv('FLOW_NAME')
) as flow:
run_snowflake_query = snowflake_query().map(
query=sql_queries,
account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
warehouse=cfg.snowflake_warehouse,
)
which attempts to call the task. It should just be:
snowflake_query.map()
Paul Stark
07/07/2022, 11:02 PMsnowflake_query = SnowflakeQuery(
max_retries=cfg.retry_max,
retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
warehouse=cfg.snowflake_warehouse,
)
sql_queries = cfg.sql_query.split(';')
with Flow(
getenv('FLOW_NAME')
) as flow:
run_snowflake_query = snowflake_query.map(
query=sql_queries,
account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
)
Kevin Kho
07/07/2022, 11:03 PM