Sandeep Aggarwal
06/23/2020, 1:36 AMKai Weber
06/23/2020, 5:59 AMSimone Cittadini
06/23/2020, 7:11 AMSimone Cittadini
06/23/2020, 8:02 AMMatias Godoy
06/23/2020, 12:52 PMmutation {
create_flow_run(input: {
flow_run_name: "Test"
version_group_id: "81261519-e91c-4fe3-bf85-10cc3a2d5016"
}) {
id
}
}
But I get the following error:
{
"graphQLErrors": [
{
"path": [
"create_flow_run"
],
"message": "Version group 81261519-e91c-4fe3-bf85-10cc3a2d5016 has no unarchived flows.",
"extensions": {
"code": "INTERNAL_SERVER_ERROR"
}
}
],
"networkError": null,
"message": "GraphQL error: Version group 81261519-e91c-4fe3-bf85-10cc3a2d5016 has no unarchived flows."
}
I'm sure the group ID is correct, and that there is a version of a test flow I created. I can even run it manually.
What am I doing wrong?
Thanks!Howard Cornwell
06/23/2020, 1:19 PMimport prefect
import sqlalchemy as sql
engine = sql.create_engine("<postgresql://tmp:tmp@localhost:5432/tmp>")
tmp_schema = sql.MetaData()
table = sql.Table("tmp", tmp_schema, sql.Column("tmp", sql.Integer))
@prefect.task
def create_table():
tmp_schema.create_all(engine, tables=[table])
with prefect.Flow("tmp") as flow:
create_table()
flow.storage = prefect.environments.storage.Docker(
registry_url="localhost:5000",
python_dependencies=["sqlalchemy"]
)
flow.register()
Raises
Traceback (most recent call last):
File "issue.py", line 23, in <module>
flow.register()
File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/core/flow.py", line 1437, in register
registered_flow = client.register(
File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/client/client.py", line 649, in register
serialized_flow = flow.serialize(build=build) # type: Any
File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/core/flow.py", line 1299, in serialize
storage = self.storage.build() # type: Optional[Storage]
File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/environments/storage/docker.py", line 293, in build
self._build_image(push=push)
File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/environments/storage/docker.py", line 329, in _build_image
dockerfile_path = self.create_dockerfile_object(directory=tempdir)
File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/environments/storage/docker.py", line 436, in create_dockerfile_object
cloudpickle.dump(self._flows[flow_name], f)
File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 48, in dump
CloudPickler(file, protocol=protocol, buffer_callback=buffer_callback).dump(obj)
File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 548, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle '_thread._local' object
Miguel
06/23/2020, 1:44 PMclient.create_flow_run
to generate all the necessary runs with the correct context but, at least using the local agent, this creates way too many processes (one for each run, I guess).
Is this this right approach or is there a way to limit the concurrency / number of processes spawned?james.lamb
06/23/2020, 3:24 PMIf you useas the execution environment for a flow, anyKubernetesJobEnvironment
you pass will be ignored and only the default executor will be usedexecutor
Jacques
06/23/2020, 3:51 PMmax_retries
parameter. I'm catching the error from boto and then logging the error immediately before doing a raise signals.FAIL()
to trigger the retry mechanism. When the boto call fails (it does this once or twice a day - unpredictably) the error is caught, logs show the task is set to Retrying
, and downstream tasks are set to Pending
. All looks good until the flow is scheduled to run again, then I get a python stack overflow as some object is being pickled (I think - seeing looped calls to bits like File "/var/lang/lib/python3.7/pickle.py", line 662 in save_reduce
in the stack trace) directly after the Beginning Flow run
message. I'm using DaskExecutor
if that matters.jeff sadler
06/23/2020, 5:04 PM[2020-06-23 16:56:03] INFO - prefect.FlowRunner | Beginning Flow run for 'run_model'
[2020-06-23 16:56:03] INFO - prefect.FlowRunner | Starting flow run.
distributed.scheduler - INFO - Receive client connection: Client-6c8e597e-b572-11ea-ace9-ac1f6b4e4528
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove client Client-6c8e597e-b572-11ea-ace9-ac1f6b4e4528
distributed.scheduler - INFO - Remove client Client-6c8e597e-b572-11ea-ace9-ac1f6b4e4528
distributed.scheduler - INFO - Close client connection: Client-6c8e597e-b572-11ea-ace9-ac1f6b4e4528
[2020-06-23 16:56:05] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
It's hard to know where to start with only Flow run FAILED: some reference tasks failed.
Chris Hart
06/23/2020, 8:34 PMMichael Hoss
06/24/2020, 8:43 AMpsimakis
06/24/2020, 12:44 PMitay livni
06/24/2020, 1:39 PMMarwan Sarieddine
06/24/2020, 4:59 PMprefect.client.client.Client
- looking at the docs I see:
set_flow_run_state(
self, flow_run_id: str, version: int, state: "prefect.engine.state.State"
)
where I can pass the flow_run_id
and state
, but I am trying to understand the intuition behind setting version
Would I have to find the version of the task the flow is currently on and increment it ?Braun Reyes
06/24/2020, 11:25 PMRafal
06/25/2020, 7:59 AMHoward Cornwell
06/25/2020, 10:47 AMEamon Keane
06/25/2020, 11:00 AMCreateNamespacedPod
task and will the prefect agent monitor the launched pod's exit status (via kubernetes api) as an indicator of task success/failure?Darragh
06/25/2020, 11:36 AMjosh
06/25/2020, 1:58 PM0.12.1
has been released! Here are a few notable highlights:
📁 File-based storage for flows that unlocks the ability for flows to “hot reload”
🚢 Flows stored in blob/bucket storage can be orchestrated with containerized agents/environments
dask Improved/streamlined environment and executor relationship
🏗️ Task slugs are now stable across rebuilds of the same flow
Read the full changelog here: https://github.com/PrefectHQ/prefect/releases/tag/0.12.1
Special thanks to all of the contributors who have assisted in discussions, issues, and PRs that have contributed to this release!Jeff Brainerd
06/25/2020, 5:23 PMagent.py
that the container env should include the env var: "PREFECT__CLOUD__AUTH_TOKEN": config.cloud.agent.auth_token,
So at this point I’m not sure if that value is not set, or it is set correctly but the agent token doesn’t have the correct auth. PS — I am open to doing this another way, such as a dedicated SlackTask, but the state handler seems somehow more semantically correct, and the CloudHook doesn’t seem to provide that kind of detailed info. (Sorry for the long post.) Thanks! 🙏Hannah Amundson
06/25/2020, 8:25 PMJeremiah
Alfie
06/26/2020, 2:55 AMAlfie
06/26/2020, 2:57 AMAlfie
06/26/2020, 2:57 AMKostas Chalikias
06/26/2020, 8:38 AMLuis Muniz
06/26/2020, 9:54 AMLuis Muniz
06/26/2020, 11:35 AMprefect agent start
This results in an error complaining about missing an API token.
prefect.utilities.exceptions.AuthorizationError: No agent API token provided.
When I then call
prefect auth create-token -n calypso -s RUNNER
I see the following error:
Traceback (most recent call last):
File "/home/lmuniz/.local/bin/prefect", line 8, in <module>
sys.exit(cli())
File "/home/lmuniz/.local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/home/lmuniz/.local/lib/python3.8/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/home/lmuniz/.local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/lmuniz/.local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/lmuniz/.local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/lmuniz/.local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/home/lmuniz/.local/lib/python3.8/site-packages/prefect/cli/auth.py", line 201, in create_token
output = client.graphql(
File "/home/lmuniz/.local/lib/python3.8/site-packages/prefect/client/client.py", line 213, in graphql
result = <http://self.post|self.post>(
File "/home/lmuniz/.local/lib/python3.8/site-packages/prefect/client/client.py", line 172, in post
response = self._request(
File "/home/lmuniz/.local/lib/python3.8/site-packages/prefect/client/client.py", line 334, in _request
json_resp = response.json()
File "/usr/lib/python3/dist-packages/requests/models.py", line 897, in json
return complexjson.loads(self.text, **kwargs)
File "/usr/lib/python3/dist-packages/simplejson/__init__.py", line 518, in loads
return _default_decoder.decode(s)
File "/usr/lib/python3/dist-packages/simplejson/decoder.py", line 370, in decode
obj, end = self.raw_decode(s)
File "/usr/lib/python3/dist-packages/simplejson/decoder.py", line 400, in raw_decode
return self.scan_once(s, idx=_w(s, idx).end())
simplejson.errors.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
I have started prefect with the recommended prefect server start
command and installed prefect with pip3.
$prefect diagnostics
{
"config_overrides": {},
"env_vars": [],
"system_information": {
"platform": "Linux-5.4.0-7624-generic-x86_64-with-glibc2.29",
"prefect_version": "0.12.1",
"python_version": "3.8.2"
}
}