Louis Amon
08/29/2022, 1:13 PMJohnathan Nguyen
08/29/2022, 1:33 PMScott White
08/29/2022, 1:34 PMprefect<2.0
and the functional API?
I will share the pseudo-code and syntax in a reply.José Duarte
08/29/2022, 1:47 PMimport prefect
import logging
logger = logging.getLogger(__name__)
@prefect.flow
def f():
<http://logger.info|logger.info>("test") # doesn't appear
What am I doing wrong?José Duarte
08/29/2022, 2:23 PMMichael Law
08/29/2022, 3:54 PMIlya Galperin
08/29/2022, 3:55 PM@flow(name="always-fail")
def entrypoint():
raise ValueError("my error message")
if __name__ == "__main__":
state = entrypoint(return_state=True)
print(state)
print(state.result)
In this scenario, state.result
can give us the detailed error message for passing back into Slack which seems really useful. However, it’d also be great if we can also provide a direct link to the flow using the flow’s run id. How are other folks implementing this, and is there a way to directly access the flow’s run id after calling the flow from main (or otherwise)?José Duarte
08/29/2022, 4:07 PMprefect.exceptions.MissingContextError: Logger 'dystematic.pdbt.ftp' attempted to send logs to Orion without a flow run id. The Orion log handler can only send logs within flow run contexts unless the flow run id is manually provided.
This isn’t covered in the post nor the docs.
The reason I want to use the regular Python logs is because I cannot use the Prefect one when launching from the console as I get:
RuntimeError: There is no active flow or task run context.
Tomás Emilio Silva Ebensperger
08/29/2022, 4:39 PMNick Coy
08/29/2022, 4:41 PMUserWarning: Block document has schema checksum sha256:dbeeaf09aa78947a7c576549b11e098c00a25bcbbf90b8d8b70c0c3a3fc8f4a2 which does not match the schema checksum for class 'Process'. This indicates the schema has changed and this block may not load.
When I go to the bucket I see the flow files there and the flow runs fine.Numline1
08/29/2022, 6:20 PMNumline1
08/29/2022, 6:20 PMMaggie Dart-Padover
08/29/2022, 7:25 PMOSError: could not get source code
. del the_task
does remove it from my environment, but then if I try to run the task definition again I still get the error. I'm not totally sure if this is a bug or expected behavior. Is my workflow here just wrong for prefect?Sam Garvis
08/29/2022, 7:27 PMprefect deployment build name.name -n name_dev -t dev_wq_k8s -sb gcs/dev --work-queue=dev_wq_k8s -ib kubernetes-job/dev-k8s-job
I get this error
Job is missing required attributes at the following paths: /apiVersion, /kind, /metadata, /spec (type=value_error)
Even though in the Prefect 2.0 UI for creating a k8s job block, it says Job (Optional)
ash
08/29/2022, 8:41 PMRichard Freeman
08/29/2022, 10:30 PMNace Plesko
08/29/2022, 11:04 PMShellTask
is not logging all the outputs from the script that it's running. I am looking at https://docs-v1.prefect.io/api/latest/tasks/shell.html#shelltask and seems like just setting stream_output=True
should do the job, but I'm still seeing just Command failed with exit code 1
. I also tried setting return_all=True
and log_stderr=True
, but still the same behavior.
Has anyone ran into this issue in the past?Walter Cavinaw
08/30/2022, 1:01 AMYousef Hosny
08/30/2022, 1:47 AMdeployment.apply()
doesn't work from a jupyter notebook cell, but does work from .py
script file.
Also, is there anyway to deploy a flow to prefect cloud using jupyter notebook ?Tommy Nam
08/30/2022, 2:22 AMAnat Tal Gagnon
08/30/2022, 4:17 AMFaheem Khan
08/30/2022, 7:33 AMAnat Tal Gagnon
08/30/2022, 8:25 AMAdrien Besnard
08/30/2022, 9:12 AMprefect_dbt
and prefect_airbyte
collections exists, I was wondering if it makes sense to have something like a dedicated block for Airbyte (which allow us to store the server_url
but also invoke a trigger_sync
function) and the same question with a DBT block (and we store all the information that we can find in the profiles.yml
, for exemple)?
• What is the best way to log from a function of a Block
subclass? Can we use the get_run_logger()
or is preferable to use some sort of callbacks that are going to be invoked inside a @task
or something?
Thanks!Aditya Sharma
08/30/2022, 10:47 AMParwez Noori
08/30/2022, 11:20 AMTarek
08/30/2022, 12:59 PMJosh Paulin
08/30/2022, 1:24 PMAnat Tal Gagnon
08/30/2022, 1:39 PMTim Enders
08/30/2022, 1:56 PM_pickle.PicklingError: Pickling client objects is explicitly not supported.
Clients have non-trivial state that is local and unpickleable.
Pickle error on clients in Prefect 2.0?Tim Enders
08/30/2022, 1:56 PM_pickle.PicklingError: Pickling client objects is explicitly not supported.
Clients have non-trivial state that is local and unpickleable.
Pickle error on clients in Prefect 2.0?Anna Geller
08/30/2022, 2:47 PMTim Enders
08/30/2022, 3:13 PMbqclient
) throws the pickling error.
def main(full_active_load=False, full_load=False):
logger = get_run_logger()
token = os.environ.get("PSH_API_TOKEN")
client = AccountsClient(
token,
token_url="REDACTED",
api_url="REDACTED",
client_id="REDACTED",
)
dest_dataset = os.environ.get("GCP_ACCOUNTS_DATASET")
query = f"select max(CAST(id AS INT64)) as last_id from `{dest_dataset}.subscriptions_orion` where api_source = 'accounts'"
highwater = query_highwater(query)
params = get_params(highwater, full_active_load, full_load)
pages_list = get_pages_list(client, "subscriptions", params)
items = get_items_list.map(
unmapped(client), unmapped("subscriptions"), pages_list[:100]
)
sub_data = []
for item in items:
sub_data.extend(item.result())
if not sub_data:
return Completed(message="No data from the API")
else:
transformed_data = transform_data(sub_data)
bqclient = bigquery.Client()
bq_result = load_df_bq(bqclient, transformed_data)
if isinstance(bq_result, LoadJob) and bq_result.state == "DONE":
return Completed(message="Load Finished!")
elif isinstance(bq_result, Failed):
return bq_result
else:
return Failed(message="Load Failure")
Anna Geller
08/30/2022, 4:24 PMTim Enders
08/30/2022, 4:25 PMAnna Geller
08/30/2022, 4:28 PMTim Enders
08/30/2022, 4:30 PM