Josh Paulin
07/13/2022, 3:55 PMwith case(cond, True):
val1_if_true = action1_if_true()
val2_if_true = action2_if_true()
with case(cond, False):
val_if_false = action_if_false()
What (if anything) do I pass to the merge function?Sabir Ali
07/13/2022, 5:21 PMTraceback (most recent call last):
File "/Users/sabirali/PycharmProjects/ETL/ElasticSearchClientTest.py", line 5, in <module>
print(<http://client.info|client.info>())
File "/Users/sabirali/PycharmProjects/ETL/venv/lib/python3.8/site-packages/elasticsearch/_sync/client/utils.py", line 414, in wrapped
return api(*args, **kwargs)
File "/Users/sabirali/PycharmProjects/ETL/venv/lib/python3.8/site-packages/elasticsearch/_sync/client/__init__.py", line 2277, in info
return self.perform_request( # type: ignore[return-value]
File "/Users/sabirali/PycharmProjects/ETL/venv/lib/python3.8/site-packages/elasticsearch/_sync/client/_base.py", line 332, in perform_request
raise UnsupportedProductError(
elasticsearch.UnsupportedProductError: The client noticed that the server is not Elasticsearch and we do not support this unknown product
I used following command to install elasticsearch client
(venv) sabirali@Sabirs-MacBook-Pro ETL % pip install elasticsearch
Collecting elasticsearch
Using cached elasticsearch-8.3.1-py3-none-any.whl (382 kB)
Requirement already satisfied: elastic-transport<9,>=8 in ./venv/lib/python3.8/site-packages (from elasticsearch) (8.1.2)
Requirement already satisfied: urllib3<2,>=1.26.2 in ./venv/lib/python3.8/site-packages (from elastic-transport<9,>=8->elasticsearch) (1.26.9)
Requirement already satisfied: certifi in ./venv/lib/python3.8/site-packages (from elastic-transport<9,>=8->elasticsearch) (2022.6.15)
Installing collected packages: elasticsearch
Successfully installed elasticsearch-8.3.1
WARNING: You are using pip version 21.3.1; however, version 22.1.2 is available.
You should consider upgrading via the '/Users/sabirali/PycharmProjects/ETL/venv/bin/python -m pip install --upgrade pip' command.
(venv) sabirali@Sabirs-MacBook-Pro ETL %
Divya
07/13/2022, 6:33 PMFrank Hardisty
07/13/2022, 8:25 PMVictoria Alvarez
07/13/2022, 8:47 PMRyan
07/14/2022, 2:05 AMAmanda Wee
07/14/2022, 2:46 AMМаксим Пышный
07/14/2022, 2:50 AMShivam Bhatia
07/14/2022, 7:46 AM400 List of found errors: 1.Field: job_spec.worker_pool_specs[0].container_spec.env[12].value; Message: Required field is not set. 2.Field: job_spec.worker_pool_specs[0].container_spec.env[3].value; Message: Required field is not set. 3.Field: job_spec.worker_pool_specs[0].container_spec.env[4].value; Message: Required field is not set. [field_violations {
field: "job_spec.worker_pool_specs[0].container_spec.env[12].value"
description: "Required field is not set."
}
field_violations {
field: "job_spec.worker_pool_specs[0].container_spec.env[3].value"
description: "Required field is not set."
}
field_violations {
field: "job_spec.worker_pool_specs[0].container_spec.env[4].value"
description: "Required field is not set."
}
]
Do i need to configure vertex jobs manually?Faheem Khan
07/14/2022, 8:54 AM_pickle.PicklingError: Can't pickle <function db_con at 0x7f85c986f880>: it's not the same object as __main__.db_con
when I run a task with querying a database. the code runs fine without that db connection task.Nikolaus Landgraf
07/14/2022, 10:01 AMTom Klein
07/14/2022, 11:35 AMX
that spawn several "children" tasks Y1
... Yn
let's say some of them are stuck on Running
as a finished state - but i wanna guarantee the next phase in the flow executes correctly and ignores the few "hung" tasks --
what's the (default) behavior for if i set those tasks (e.g. Y3
, Y17
and Y91
) to Skipped
? would the next task that depends on them still get executed (even if it has the default all_successful
trigger?)
the reason i'm asking about Skipped
is because i wanna avoid a None
response flowing downstream from these tasksRajeshwar Agrawal
07/14/2022, 12:15 PMconfig.toml
setting for disabling both heartbeat and lazarus for prefect flows on Prefect Server?Abhishek Mitra
07/14/2022, 2:59 PM@task
decorator ?
I get pymysql.err.OperationalError: (2003, "Can't connect to MySQL server on '<http://xyz-mysql8-cluster.cluster-fa43asdfd.us-west-2.rds.amazonaws.com|xyz-mysql8-cluster.cluster-fa43asdfd.us-west-2.rds.amazonaws.com>' (timed out)")
every time. Any suggestions on how to approach in solving this?Vrinda Kailash
07/14/2022, 3:10 PMFuETL
07/14/2022, 3:22 PMJoshua Massover
07/14/2022, 4:22 PMPREFECT__LOGGING__LEVEL=DEBUG
as an environment variable, confirmed it's in the environment, and my agent logs are not showing anything but INFO. Is there anything else that I should need to set?Thomas Opsomer
07/14/2022, 4:23 PMtask_run_state_aggregate
but I don't know how to filter on a specific task 😕
If anyone can help 🙏Joshua Massover
07/14/2022, 5:51 PM[2022-07-14 17:49:44,859] DEBUG - agent | Querying for ready flow runs...
DEBUG:agent:Querying for ready flow runs...
[2022-07-14 17:49:44,959] DEBUG - agent | No ready flow runs found.
DEBUG:agent:No ready flow runs found.
DEBUG:agent:Sleeping flow run poller for 10.0 seconds...
[2022-07-14 17:49:44,960] DEBUG - agent | Sleeping flow run poller for 10.0 seconds...
[2022-07-14 17:49:54,960] DEBUG - agent | Querying for ready flow runs...
DEBUG:agent:Querying for ready flow runs...
[2022-07-14 17:49:55,046] DEBUG - agent | No ready flow runs found.
DEBUG:agent:No ready flow runs found.
[2022-07-14 17:49:55,046] DEBUG - agent | Sleeping flow run poller for 10.0 seconds...
DEBUG:agent:Sleeping flow run poller for 10.0 seconds...
Anyone have an suggestions on how to continue to debug?Matan Drory
07/14/2022, 5:55 PMMinh Mai
07/14/2022, 6:08 PMflow.run()
will fail once I register it to the UI? I'm current using the local server
prefect backend server
prefect server start
prefect agent local start
the error im getting is
Failed to load and execute flow run: FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'utils\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
Sergey Goncharov
07/14/2022, 8:21 PMCreateAndRunJob
marked as succeeded but prefect does not wait for the end of the execution and then function get_our_pod_name
fails because it cannot find Job which is not yet created.
I though to use threading or queue from python here, but I think it's not the right way with Prefect.
Could you please advice here? I believe I do something wrong here.Cab Maddux
07/14/2022, 10:04 PMprefect.packaging.docker.DockerPackager
class and have it push to Google Container Registry. It looks like the current package()
method is unable to push to GCR because the prefect.docker.push_image
here ignores any path provided in the registry url.
So it looks to me like if I provide a registry_url
like <https://gcr.io/my-project-name>
(where my-project-name
is a GCP project) for a flow named my-flow
rather than pushing an image to <http://gcr.io/my-project-name/my-flow|gcr.io/my-project-name/my-flow>
the current implementation will try to push to <http://gcr.io/my-flow|gcr.io/my-flow>
(which fails because I don't have a project named my-flow
).
Wondering if I'm missing something here or maybe this should go into an issue?Alex Fok
07/14/2022, 10:24 PMJoshua Massover
07/14/2022, 10:38 PMdef patch_task():
method = getattr(CloudTaskRunner, "run")
def patched(self, *args, **kwargs):
name = self.task.name
with tracer.trace(f"task", resource=name):
result = method(self, *args, **kwargs)
tracer.flush()
return result
setattr(CloudTaskRunner, "run", patched)
def patch_flow():
method = getattr(CloudFlowRunner, "run")
def patched(self, *args, **kwargs):
name = self.flow.name
with tracer.trace(f"flow", resource=name):
result = method(self, *args, **kwargs)
tracer.flush()
return result
setattr(CloudFlowRunner, "run", patched)
Mohamed Hatem Diabi
07/15/2022, 12:13 AM@task(log_stdout=True)
def create_subflows(list_of_elements):
list_param= []
for element in list_of_elements:
list_param.append(
{
"pr": element,
}
)
mapped_flows = create_flow_run.map(
flow_name=unmapped("Subflow"),
parameters=list_param,
)
with Flow("Parent Flow") as flow:
list_of_elements = [1,2,3]
create_subflows(list_of_elements)
I am getting this error:
`ValueError: Could not infer an active Flow context while creating edge to <Task: create_flow_run>. This often means you called a task outside a with Flow(...) block. If you're trying to run this task outside of a Flow context, you need to call create_flow_run.run(...)
Ievgenii Martynenko
07/15/2022, 6:19 AMwonsun
07/15/2022, 6:56 AMserver.database.connection_url
or server.database.host
to connecting information for MySQL database, not postgres?Mickael ANDRIEU
07/15/2022, 8:11 AMEric So
07/15/2022, 8:38 AMLocalDaskExecutor(scheduler="processes")
while with threads
logs can be seen in UI as normal, any ideas?Eric So
07/15/2022, 8:38 AMLocalDaskExecutor(scheduler="processes")
while with threads
logs can be seen in UI as normal, any ideas?Anna Geller
07/15/2022, 9:42 AM