Anh Nguyen
01/13/2022, 12:59 AMAnh Nguyen
01/13/2022, 5:37 AMAqib Fayyaz
01/13/2022, 7:53 AMJamie McDonald
01/13/2022, 11:40 AMmap()
functionality? My scenario is a list of URLs that should be used for making requests to but want to perform the requests in batches of 'n' rather than overwhelming a server with them all at once.Martim Lobao
01/13/2022, 11:49 AMtask_run_name
and name
args in the task
decorator to provide more context, but neither works the way I’d like it to.
as a MWE, here’s a sketch of what I’d like to happen:
with Flow() as flow:
task_a(entity="foo") # shows up as "foo_task" in the DAG
task_a(entity="bar") # shows up as "bar_task" in the DAG
the issue is that name
only takes in static strings (so name="{entity}"
doesn’t work) and task_run_name
only sets the task run name, meaning it will never show up in the schematic outside of flow runs (even in flow runs, the name is only shown when clicking on each individual task card, making it hard to see an overall picture). is there any way to achieve what i’d like to do?davzucky
01/13/2022, 12:32 PM@task
def list_file_from_storate(prefix: str) -> List[str]:
result = prefect.context["result"]
return result.list_files(prefix)
Looking at the context doc I cannot see the result been available here https://docs.prefect.io/api/latest/utilities/context.html
We we want is to be able to interact with the result using task from the flow setup.Ahmed Rafik
01/13/2022, 12:39 PMimport pandas as pd
from prefect.deployments import DeploymentSpec
from prefect.orion.schemas.schedules import IntervalSchedule
from pred.predict import predict_flow
DeploymentSpec(
flow=predict_flow,
name="Prediction"
)
I create the deployment without a problem and I can see it in the UI. I also can run it successfully from CLI using:
prefect deployment execute 'Prediction flow/Prediction'
But When I try to run it using the “Quick Run” in the UI, a “Scheduled” task is created and is never run. I can see it in the lateness graph. one task didn’t run for over an hour during my lunch break. Same happens if I add a schedule to the deployment. any ideas why that happens or how to fix it?Andrew Hah
01/13/2022, 2:27 PMEmma Rizzi
01/13/2022, 3:30 PMprefect agent docker start ... &
and killing manually with linux signals
Is there a recommended method to do this ?brian
01/13/2022, 3:48 PMYusuf Khan
01/13/2022, 4:12 PMPablo Espeso
01/13/2022, 5:42 PMHwi Moon
01/13/2022, 6:10 PMBrett Naul
01/13/2022, 6:15 PMHwi Moon
01/13/2022, 6:25 PMMarwan Sarieddine
01/13/2022, 7:44 PMResourceManager
and using set_upstream
How can I ensure that a task is run after the resource manager cleanup ? (Please see the thread for more details)Jason Motley
01/13/2022, 10:28 PMcase
statement, can one rename it so the flow schematic is clearer? Example below, as opposed to the photo
with case(condition, True, tasK-args = {name: bla bla bla}):
Justin Grosvenor
01/13/2022, 10:39 PMLing Chen
01/13/2022, 11:26 PMTrevor Sweeney
01/14/2022, 12:35 AMWilliam Grim
01/14/2022, 2:31 AMprefecthq/server:latest
that came out yesterday, things like graphql
just go into crash loops in docker/kubernetes. I had to pin the version to 2021.11.30
to resolve crashing issues.
We also use hasure/graphql-engine:v1.3.0
, and it seems like that's been updated quite a few versions. Would upgrading that resolve the issues, you think? I'm not sure what major things have changed in hasura though.Ling Chen
01/14/2022, 5:36 AMcreate_flow_run
in normal python console but not in a python Flask app? The following minimal example will raise AttributeError: 'Context' object has no attribute 'logger'
error. Any ideas?
from flask import Flask
from prefect.tasks.prefect import create_flow_run
app = Flask(__name__)
@app.route("/")
def hello_world():
create_flow_run.run(flow_name="test_flow",
project_name="test_project")
return "<p>Create Prefect Flow Run From Flask!</p>"
andres aava
01/14/2022, 6:40 AMclass saveChartmogulActivities(Task):
def __init__(self, apikey, **kwargs):
self.apikey = apikey
self.endpoint = ENDPOINT_CHARTMOGUL_ACTIVITIES_EXPORT
self.tmp_file_path = TMP_FILE_PATH
super().__init__(**kwargs)
def run(self):
return self.apikey
apikey seems to be passed as Task not as actual value.
Flow looks like this, it seems to be something with my custom defined Task class, but could not figure it out or find quickly from community here.
Could anybody tell just by looking at it? :)
with Flow('chartmogul-extract', run_config=RUN_CONFIG) as flow:
# Reads Secrets from Prefect Cloud > Team > Secrets
chartmogul_secrets = PrefectSecret("CHARTMOGUL")
chartmogul_apikey = chartmogul_secrets['api_key']
# Extract
cm_activities_task = saveChartmogulActivities(chartmogul_apikey, log_stdout=True)
cm_activities_run = cm_activities_task()
Although when I pass apikey as hardcoded string value, it works.Arcondo Dasilva
01/14/2022, 9:04 AMEmil N.
01/14/2022, 10:02 AMTony Waddle
01/14/2022, 11:17 AMАндрій Демиденко
01/14/2022, 1:06 PMBlake Enyart
01/14/2022, 1:58 PMTask 'AirbyteConnectionTask[4]': Exception encountered during task execution!
Traceback (most recent call last):
File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
logger=self.logger,
File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 454, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/utilities/tasks.py", line 456, in method
return run_method(self, *args, **kwargs)
File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/tasks/airbyte/airbyte.py", line 250, in run
self._check_health_status(session, airbyte_base_url)
File "/home/ec2-user/.local/lib/python3.7/site-packages/prefect/tasks/airbyte/airbyte.py", line 78, in _check_health_status
health_status = response.json()["db"]
KeyError: 'db'
I'm using it within a mapped flow. Has anyone run into this issue yet?Patrick Alves
01/14/2022, 2:01 PMimport prefect
from prefect import Flow, task
from prefect.run_configs import DockerRun
from prefect.storage import Docker
logger = prefect.context.get("logger")
@task
def task01():
<http://logger.info|logger.info>("Task 01")
@task
def task02():
<http://logger.info|logger.info>("Task 02")
@task
def task03():
<http://logger.info|logger.info>("Task 03")
with Flow("Check Computers",
storage=Docker(dockerfile="Dockerfile", image_name="check_computers", image_tag="latest"),
run_config=DockerRun(image="check_computers:latest")) as flow:
task01()
task02()
task03()
# Register the flow under the "tutorial" project
flow.register(project_name="CIS")
Then, in the logs I've got: Failed to load and execute Flow's environment: FileNotFoundError(2, 'No such file or directory')
Does anyone know what I am missing?yslx
01/14/2022, 2:14 PMyslx
01/14/2022, 2:14 PMKevin Kho
01/14/2022, 2:29 PMAnna Geller
01/14/2022, 2:29 PMyslx
01/14/2022, 2:40 PMKevin Kho
01/14/2022, 2:42 PMsys.path.append
but I haven’t seen it work. It’s pretty trickyyslx
01/14/2022, 2:43 PM