Luuk
07/26/2021, 4:59 AMSamuel Tober
07/26/2021, 8:24 AMMartin Felder
07/26/2021, 8:30 AMMartin Felder
07/26/2021, 8:30 AMMartin Felder
07/26/2021, 8:31 AMBruno Murino
07/26/2021, 9:06 AMimport prefect
from prefect.agent.ecs.agent import ECSAgent
agent = ECSAgent(
region_name="eu-west-1",
cluster="knightsbridge",
labels=["ecs"],
launch_type="EC2",
agent_address="<http://0.0.0.0:8009>",
)
agent.start()
Dotan Asselmann
07/26/2021, 11:42 AMHilary Roberts
07/26/2021, 12:58 PMPedro Machado
07/26/2021, 1:39 PMMadison Schott
07/26/2021, 2:14 PMRuntimeError: Error while contacting API at <https://api.prefect.io>
Bruno Murino
07/26/2021, 2:42 PMJelle Vegter
07/26/2021, 3:39 PMLeon Kozlowski
07/26/2021, 4:37 PMPedro Machado
07/26/2021, 4:58 PMcreate_flow_run
and get_task_run_result
, how can I assign a different name to different instances so that I can see which task instance is running in the UI without having to look at the logs?itay livni
07/26/2021, 6:12 PMmutation{
delete_agent(input: {agent_id:"<id>"}){success}
}
Thanks in advanceMadison Schott
07/26/2021, 7:32 PMBruno Centeno
07/26/2021, 7:46 PMAn Hoang
07/26/2021, 8:58 PMdf.parquet
file:
parquet_result = LocalResult(dir="./test_prefect", serializer = PandasSerializer("parquet")
@task
def test_task(df1, df2):
parquet_result.write(df1, location = "df1.parquet", **context)
parquet_result.write(df2, location = "df2.parquet", **context)
Currently I have to set the location
attribute at the time of instantiating the LocalResult
object. The code below works
parquet_result_partial = partial(LocalResult, dir="./test_prefect", serializer = PandasSerializer("parquet"))
@task
def test_task(df1, df2):
parquet_result_partial(location = "df1.parquet").write(df1, **context)
parquet_result_partial(location = "df1.parquet").write(df2, **context)
So it seems the location
kwargs to Result.write
does not do anything. Is this by design? Or am I missing somethingBilly McMonagle
07/26/2021, 9:21 PMfrom prefect import Flow, Parameter, task
from prefect.schedules import CronSchedule
@task
def my_task(my_parameter):
print(f"my_parameter value is {my_parameter}")
with Flow("my-flow-1", schedule=CronSchedule("0 * * * *")) as flow1:
param = Parameter("my_parameter", default="flow-1-parameter-value")
my_task(param)
with Flow("my-flow-2", schedule=CronSchedule("1 * * * *")) as flow2:
param = Parameter("my_parameter", default="flow-2-parameter-value")
my_task(param)
Madison Schott
07/26/2021, 10:43 PMwith Flow("user_brand_campaigns", storage=STORAGE, config=RUN_CONFIG) as user_profile_flow:
TypeError: __init__() got an unexpected keyword argument 'config'
Andre Muraro
07/27/2021, 12:50 AMjake lee
07/27/2021, 8:28 AMNicholas Hemley
07/27/2021, 10:20 AMJelle Vegter
07/27/2021, 10:31 AMRinze
07/27/2021, 12:23 PMRinze
07/27/2021, 12:23 PMTraceback (most recent call last):
File "C:\Users\***\code\pipeline\src\flows_combined_entries.py", line 76, in <module>
sqlserver.run(query='select * from julitest.entries where id = 50299',
File "C:\Users\***\code\pipeline\venv\lib\site-packages\prefect\utilities\tasks.py", line 441, in method
return run_method(self, *args, **kwargs)
File "C:\Users\***\code\pipeline\venv\lib\site-packages\prefect\tasks\sql_server\sql_server.py", line 90, in run
executed = cursor.execute(query=query, vars=data)
TypeError: execute() takes no keyword arguments
Sean Talia
07/27/2021, 1:58 PMprefect.utilities.exceptions.ClientError: [{'path': ['flow_run_by_pk'], 'message': 'request to <http://hasura:3000/v1alpha1/graphql> failed, reason: read ECONNRESET', 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'request to <http://hasura:3000/v1alpha1/graphql> failed, reason: read ECONNRESET', 'type': 'system', 'errno': 'ECONNRESET', 'code': 'ECONNRESET'}}}]
I haven't run into this one before...this is on CloudPedro Machado
07/27/2021, 2:16 PMMohamed Hajji
07/27/2021, 2:23 PMPS C:\Users\username> prefect config
{"debug": false, "home_dir": "C:\\Users\\username/.prefect", "backend": "server", "server": {"host": "<http://localhost>", "port": 4200, "host_port": 4200, "endpoint": "<http://localhost:4200>", "database": {"host": "localhost", "port": 5432, "host_port": 5432, "name": "prefect_server", "username": "prefect", "password": "test-password", "connection_url": "<postgresql://prefect>:test-password@localhost:5432/prefect_server", "volume_path": "C:\\Users\\Mohamed Hajji/.prefect/pg_data"}, "graphql": {"host": "0.0.0.0", "port": 4201, "host_port": 4201, "debug": false, "path": "/graphql/"}, "hasura": {"host": "localhost", "port": 3000, "host_port": 3000, "admin_secret": "", "claims_namespace": "hasura-claims", "graphql_url": "<http://localhost:3000/v1alpha1/graphql>", "ws_url": "<ws://localhost:3000/v1alpha1/graphql>", "execute_retry_seconds": 10}, "ui": {"host": "<http://localhost>", "port": 8080, "host_port": 8080, "endpoint": "<http://localhost:8080>", "apollo_url": "<http://localhost:4200/graphql>"}, "telemetry": {"enabled": true}}, "cloud": {"api": "<http://localhost:4200>", "endpoint": "<https://api.prefect.io>", "graphql": "<http://localhost:4200/graphql>", "use_local_secrets": true, "heartbeat_interval": 30.0, "check_cancellation_interval": 15.0, "diagnostics": false, "request_timeout": 15, "send_flow_run_logs": true, "logging_heartbeat": 5, "queue_interval": 30.0, "api_key": "", "tenant_id": "", "agent": {"name": "agent", "labels": [], "level": "INFO", "auth_token": "", "agent_address": "", "resource_manager": {"loop_interval": 60}}}, "logging": {"level": "INFO", "format": "[%(asctime)s] %(levelname)s - %(name)s | %(message)s", "log_attributes": [], "datefmt": "%Y-%m-%d %H:%M:%S%z", "extra_loggers": []}, "flows": {"eager_edge_validation": false, "run_on_schedule": true, "checkpointing": false, "defaults": {"storage": {"add_default_labels": true, "default_class": "prefect.storage.Local"}}}, "tasks": {"defaults": {"max_retries": 0, "retry_delay": null, "timeout": null}}, "engine": {"executor": {"default_class": "prefect.executors.LocalExecutor", "dask": {"address": "", "cluster_class": "distributed.deploy.local.LocalCluster"}}, "flow_runner": {"default_class": "prefect.engine.flow_runner.FlowRunner"}, "task_runner": {"default_class": "prefect.engine.task_runner.TaskRunner"}}}
Michael Warnock
07/27/2021, 2:32 PMflow.run(executor=ex)
as opposed to create_flow_run
on a prefect client. When doing the latter, I get no apparent attempt to start the cluster, and I see task output on my docker agent (which, interestingly, is full of s3 permissions related crashes that don't happen if I don't specify my coiled/dask executor). When running with flow.run
obviously the flow doesn't appear in my dashboard. What's the expected behavior? Am I supposed to have some other kind of agent running?Michael Warnock
07/27/2021, 2:32 PMflow.run(executor=ex)
as opposed to create_flow_run
on a prefect client. When doing the latter, I get no apparent attempt to start the cluster, and I see task output on my docker agent (which, interestingly, is full of s3 permissions related crashes that don't happen if I don't specify my coiled/dask executor). When running with flow.run
obviously the flow doesn't appear in my dashboard. What's the expected behavior? Am I supposed to have some other kind of agent running?Kevin Kho
07/27/2021, 2:37 PMflow.run()
does not appear on the dashboard. This is only for local testing. When you’re ready for production, you register the flow.
The client.create_flow_run
takes in a flow id to start so you would need to register first before you can use it.
After registration, you can start a flow by clicking the “Quick Run” button in the UI, that will attempt to pass the flow to an agent. In your case, you want to spin up the local agent that would execute the Flow. Through the CLI it would be prefect agent local start
. This agent will pick up and execute the flow runs. It will also pick up the scheduled flow runs. Just make sure agent labels match the flow labels.Michael Warnock
07/27/2021, 2:47 PMKevin Kho
07/27/2021, 2:50 PMMichael Warnock
07/27/2021, 2:54 PMcreate_flow_run
I see no attempt to spin up or connect to the dask cluster (which log would that appear in? I also see no indication on the coiled dashboard that a cluster has been started, or any new ec2 instances), and the docker-agent logs are full of errors related to s3 permissions that I don't get if the executor isn't specified.flow.run
- it actually works, modulo a thread safety issue I'm in the process of fixingKevin Kho
07/27/2021, 2:59 PMMichael Warnock
07/27/2021, 3:03 PMKevin Kho
07/27/2021, 3:08 PMMichael Warnock
07/27/2021, 3:11 PMKevin Kho
07/27/2021, 3:17 PMflow.run()
will not spin up the container on local to run the flow so I think the environment variables that you have on your local machine are providing that authentication, and those are not in the container with is why the errors are happening.Michael Warnock
07/27/2021, 3:23 PMcreate_flow_task
without the dask-executor. If I fail to provide the credentials, I get an error along the lines of "credentials not found". If I do a flow.run(executor=my-dask-executor)
an image is built by coiled, and run on the cluster it spins up. These tasks access s3 just fine. Something other than my credentials being there is wrong.Michael Adkins
07/27/2021, 3:41 PMMichael Warnock
07/27/2021, 3:44 PMMichael Adkins
07/27/2021, 3:48 PMMichael Warnock
07/27/2021, 3:48 PMMichael Adkins
07/27/2021, 3:51 PMstored_as_script=True
with your Docker storage?Michael Warnock
07/27/2021, 3:52 PMflow = ford.flow
executor = ford.get_coiled_executor(image_uri=image_uri, region_name=worker_config.region_name)
flow.executor = executor
#flow.run_config = ECSRun(image=docker_image)
#flow.run_config = DockerRun()#image=docker_image)
flow_id = flow.register(project_name='feature-generator')
prefect_client = Client()
prefect_client.create_flow_run(
flow_id=flow_id,
parameters=dict(job_spec=job_spec),
)
#flow.run(executor=executor, parameters=dict(job_spec=job_spec))
flow.storage = Docker(
dockerfile="./Dockerfile",
prefect_directory="/usr/src/app",
stored_as_script=True,
path="/usr/src/app/feature_generator/ford.py"
)
Michael Adkins
07/27/2021, 3:55 PM/usr/src/app/feature_generator/ford.py
then extracting the flow from the variables in the file. Since you are setting the executor in a different file, the executor is never set on your flow run.Michael Warnock
07/27/2021, 3:56 PMMichael Adkins
07/27/2021, 3:57 PMflow.register
and some aren't. Executors are not persisted to allow more customizable options (ie we don't have to know how to serialize/deserialize it)Michael Warnock
07/27/2021, 4:04 PMMichael Adkins
07/27/2021, 4:07 PMexecutor = ford.get_coiled_executor(image_uri=os.environ.get("IMAGE_URI"), region_name=worker_config.region_name)
flow.executor = executor
Michael Warnock
07/27/2021, 4:09 PMMichael Adkins
07/27/2021, 4:39 PMMichael Warnock
07/27/2021, 6:14 PMMichael Adkins
07/27/2021, 6:16 PMMichael Warnock
07/27/2021, 6:19 PMMichael Adkins
07/27/2021, 6:21 PMMichael Warnock
07/27/2021, 6:21 PMMichael Adkins
07/27/2021, 6:22 PMMichael Warnock
07/27/2021, 6:26 PMMichael Adkins
07/27/2021, 6:28 PMMichael Warnock
07/27/2021, 6:29 PMMichael Adkins
07/27/2021, 6:32 PMRunConfig
with the environment variable set to the URI from CIMichael Warnock
07/27/2021, 6:35 PMMichael Adkins
07/27/2021, 6:40 PMbuild()
/ register(build=False)
steps if you're married to Docker storage.Michael Warnock
07/27/2021, 6:42 PMMichael Adkins
07/27/2021, 6:43 PMDockerRun
after you register the flow, it will not be attached to the flowMichael Warnock
07/27/2021, 6:43 PMMichael Adkins
07/27/2021, 6:44 PMMichael Warnock
07/27/2021, 6:45 PMMichael Adkins
07/27/2021, 6:46 PMMichael Warnock
07/27/2021, 8:04 PM