Peter
06/23/2020, 12:01 AMflow.run()
. I've read through the documentation on logging which recommends you use prefect's utility function get_logger()
to hook into a flow's logs, which works fine for creating a new application but would be a ton of work for us to change our existing applications that use python's standard logging practice - essentially defining logger = logging.getLogger(__name___)_
at the top of the module. Unfortunately, when we call flow.run()
with tasks that invoke our existing application that uses standard logging it looks like prefect disables those logs. Is there any way to tell prefect to log from existing applications that use python's standard logging.getLogger
instead of prefect's get_logger
utility?Peter
06/23/2020, 12:01 AMSandeep Aggarwal
06/23/2020, 1:36 AMKai Weber
06/23/2020, 5:59 AMSimone Cittadini
06/23/2020, 7:11 AMSimone Cittadini
06/23/2020, 8:02 AMMatias Godoy
06/23/2020, 12:52 PMmutation {
create_flow_run(input: {
flow_run_name: "Test"
version_group_id: "81261519-e91c-4fe3-bf85-10cc3a2d5016"
}) {
id
}
}
But I get the following error:
{
"graphQLErrors": [
{
"path": [
"create_flow_run"
],
"message": "Version group 81261519-e91c-4fe3-bf85-10cc3a2d5016 has no unarchived flows.",
"extensions": {
"code": "INTERNAL_SERVER_ERROR"
}
}
],
"networkError": null,
"message": "GraphQL error: Version group 81261519-e91c-4fe3-bf85-10cc3a2d5016 has no unarchived flows."
}
I'm sure the group ID is correct, and that there is a version of a test flow I created. I can even run it manually.
What am I doing wrong?
Thanks!Howard Cornwell
06/23/2020, 1:19 PMimport prefect
import sqlalchemy as sql
engine = sql.create_engine("<postgresql://tmp:tmp@localhost:5432/tmp>")
tmp_schema = sql.MetaData()
table = sql.Table("tmp", tmp_schema, sql.Column("tmp", sql.Integer))
@prefect.task
def create_table():
tmp_schema.create_all(engine, tables=[table])
with prefect.Flow("tmp") as flow:
create_table()
flow.storage = prefect.environments.storage.Docker(
registry_url="localhost:5000",
python_dependencies=["sqlalchemy"]
)
flow.register()
Raises
Traceback (most recent call last):
File "issue.py", line 23, in <module>
flow.register()
File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/core/flow.py", line 1437, in register
registered_flow = client.register(
File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/client/client.py", line 649, in register
serialized_flow = flow.serialize(build=build) # type: Any
File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/core/flow.py", line 1299, in serialize
storage = self.storage.build() # type: Optional[Storage]
File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/environments/storage/docker.py", line 293, in build
self._build_image(push=push)
File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/environments/storage/docker.py", line 329, in _build_image
dockerfile_path = self.create_dockerfile_object(directory=tempdir)
File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/environments/storage/docker.py", line 436, in create_dockerfile_object
cloudpickle.dump(self._flows[flow_name], f)
File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 48, in dump
CloudPickler(file, protocol=protocol, buffer_callback=buffer_callback).dump(obj)
File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 548, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle '_thread._local' object
Miguel
06/23/2020, 1:44 PMclient.create_flow_run
to generate all the necessary runs with the correct context but, at least using the local agent, this creates way too many processes (one for each run, I guess).
Is this this right approach or is there a way to limit the concurrency / number of processes spawned?james.lamb
06/23/2020, 3:24 PMIf you useas the execution environment for a flow, anyKubernetesJobEnvironment
you pass will be ignored and only the default executor will be usedexecutor
Jacques
06/23/2020, 3:51 PMmax_retries
parameter. I'm catching the error from boto and then logging the error immediately before doing a raise signals.FAIL()
to trigger the retry mechanism. When the boto call fails (it does this once or twice a day - unpredictably) the error is caught, logs show the task is set to Retrying
, and downstream tasks are set to Pending
. All looks good until the flow is scheduled to run again, then I get a python stack overflow as some object is being pickled (I think - seeing looped calls to bits like File "/var/lang/lib/python3.7/pickle.py", line 662 in save_reduce
in the stack trace) directly after the Beginning Flow run
message. I'm using DaskExecutor
if that matters.jeff sadler
06/23/2020, 5:04 PM[2020-06-23 16:56:03] INFO - prefect.FlowRunner | Beginning Flow run for 'run_model'
[2020-06-23 16:56:03] INFO - prefect.FlowRunner | Starting flow run.
distributed.scheduler - INFO - Receive client connection: Client-6c8e597e-b572-11ea-ace9-ac1f6b4e4528
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove client Client-6c8e597e-b572-11ea-ace9-ac1f6b4e4528
distributed.scheduler - INFO - Remove client Client-6c8e597e-b572-11ea-ace9-ac1f6b4e4528
distributed.scheduler - INFO - Close client connection: Client-6c8e597e-b572-11ea-ace9-ac1f6b4e4528
[2020-06-23 16:56:05] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
It's hard to know where to start with only Flow run FAILED: some reference tasks failed.
Chris Hart
06/23/2020, 8:34 PMMichael Hoss
06/24/2020, 8:43 AMpsimakis
06/24/2020, 12:44 PMitay livni
06/24/2020, 1:39 PMMarwan Sarieddine
06/24/2020, 4:59 PMprefect.client.client.Client
- looking at the docs I see:
set_flow_run_state(
self, flow_run_id: str, version: int, state: "prefect.engine.state.State"
)
where I can pass the flow_run_id
and state
, but I am trying to understand the intuition behind setting version
Would I have to find the version of the task the flow is currently on and increment it ?Braun Reyes
06/24/2020, 11:25 PMRafal
06/25/2020, 7:59 AMHoward Cornwell
06/25/2020, 10:47 AMEamon Keane
06/25/2020, 11:00 AMCreateNamespacedPod
task and will the prefect agent monitor the launched pod's exit status (via kubernetes api) as an indicator of task success/failure?Darragh
06/25/2020, 11:36 AMjosh
06/25/2020, 1:58 PM0.12.1
has been released! Here are a few notable highlights:
📁 File-based storage for flows that unlocks the ability for flows to “hot reload”
🚢 Flows stored in blob/bucket storage can be orchestrated with containerized agents/environments
:dask: Improved/streamlined environment and executor relationship
🏗️ Task slugs are now stable across rebuilds of the same flow
Read the full changelog here: https://github.com/PrefectHQ/prefect/releases/tag/0.12.1
Special thanks to all of the contributors who have assisted in discussions, issues, and PRs that have contributed to this release!Jeff Brainerd
06/25/2020, 5:23 PMagent.py
that the container env should include the env var: "PREFECT__CLOUD__AUTH_TOKEN": config.cloud.agent.auth_token,
So at this point I’m not sure if that value is not set, or it is set correctly but the agent token doesn’t have the correct auth. PS — I am open to doing this another way, such as a dedicated SlackTask, but the state handler seems somehow more semantically correct, and the CloudHook doesn’t seem to provide that kind of detailed info. (Sorry for the long post.) Thanks! 🙏Hannah Amundson
06/25/2020, 8:25 PMJeremiah
06/25/2020, 9:58 PMAlfie
06/26/2020, 2:55 AMAlfie
06/26/2020, 2:57 AMAlfie
06/26/2020, 2:57 AMKostas Chalikias
06/26/2020, 8:38 AMKostas Chalikias
06/26/2020, 8:38 AMArsenii
06/26/2020, 9:04 AMKostas Chalikias
06/26/2020, 4:53 PMnicholas
06/26/2020, 4:58 PMKostas Chalikias
06/26/2020, 5:31 PMnicholas
06/26/2020, 5:42 PMKostas Chalikias
06/26/2020, 5:50 PMnicholas
06/26/2020, 5:51 PM