Sarita Patel
12/02/2021, 3:46 PMTom Shaffner
12/02/2021, 4:46 PMTom Klein
12/02/2021, 8:44 PMRun configuration
(Local, Universal, Docker, Kubernetes)
• an Agent
(e.g. Local, Kubernetes, Docker)
• and an Executor
- (Local, LocalDask, and DaskExecutor)
If i understand correctly -
• our Agent
is the entity (an always-running program, basically) which communicates with the Prefect server (and its scheduler, etc.) and brings jobs to execution phase. If it lives in Kubernetes then by default it executes flows as Kubernetes jobs , but - if we were to pick a "local" Run configuration
- it would run it on the agent itself?
• the Run Configuration
determines where the Agent
runs the flow (e.g., if we picked a Docker
run_configuration for our Docker
agent - then it would run as a docker image inside the docker image? or alongside it - as a "sibling" docker instance?)
• the Executor
determines how the flow should be executed, so for example if we had a LocalDask
executor running in a Docker
run_config with a Docker
Agent - it would spin up a local Dask cluster inside the docker that's running the job? but if we picked a DaskExecutor
then the flow would actually be executed outside the docker that's running the flow? (assuming our dask cluster runs alongside our Docker Daemon, e.g. on EC2)
did i get it right or am i missing something? 😄Tilak Maddy
12/02/2021, 10:19 PMBilly McMonagle
12/02/2021, 10:35 PMTom Shaffner
12/03/2021, 12:28 AMSandip Viradiya
12/03/2021, 5:47 AMECSAgent(
cluster="NAME OF THE CLUSTER",
task_role_arn="ROLE ARN"
)
Above code is working perfectly for one Prefect account but giving below error for another account. Can anyone please guide me what I am missing?
ValueError: Failed to infer default networkConfiguration, please explicitly configure using
--run-task-kwargs``
I did not required any configuration for first account for networkConfiguration.
I am using Prefect 0.15.9.Vadym Dytyniak
12/03/2021, 10:13 AMTilak Maddy
12/03/2021, 10:26 AMUserWarning: No result handler was specified on your Flow. Cloud features such as input caching and resuming task runs from failure may not work properly. registered_flow = client.register(
That's the whole warning message. and here is the code. It worked properly though when i triggered the run from prefect cloud, there was no problem in execution. But I want to be able to resume tasks is they fail. what do I do ?
import os
import time
from prefect.storage import GitHub
import prefect
from prefect import task, Flow, Parameter
from prefect.run_configs import LocalRun
from prefect.executors import LocalDaskExecutor
@task
def say_hello(name):
# Add a sleep to simulate some long-running task
time.sleep(3)
# Load the greeting to use from an environment variable
greeting = os.environ.get("GREETING")
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"{greeting}, {name}!")
with Flow("hello-flow") as flow:
people = Parameter("people", default=["Arthur", "Ford", "Marvin"])
say_hello.map(people)
flow.storage = GitHub(
repo="XXX/test-repo",
path="learning_storage.py",
access_token_secret="XXX"
)
flow.run_config = LocalRun(env={"GREETING": "Hello from User 2 "}, labels=["dev"])
flow.executor = LocalDaskExecutor()
flow.register(project_name="test_user_2")
Ran this on my local machine. yes I have a copy of the flow in the mentioned github repo too.Tilak Maddy
12/03/2021, 11:42 AMBruno Murino
12/03/2021, 1:04 PMJacob Blanco
12/03/2021, 2:11 PMCasey Green
12/03/2021, 3:24 PMAn Hoang
12/03/2021, 4:02 PMShellTask("cd directory/to/switch/to")
? so that all of the results outputted are now ``directory/to/switch/to/result` ?
My directory/to/switch/to/
is templated at runtime and is a result of a task. Right now I have to pass this path as parameter to all subsequent tasks. Wondering if there's a more efficient/less error-prone way for thisCasey Green
12/03/2021, 4:27 PMOrionClient.create_deployment(...)
In both cases, it supposedly succeeds, but it's not showing up in the UI. I've also tried copying the simple example in the docs verbatim, but no dice.
$ prefect version
2.0a5
$ prefect deployment create ./my_flow_deployment.py
Loading deployments from python script at 'my_flow_deployment.py'...
Created deployment 'my-first-deployment' for flow 'Addition Machine'
note: I'm able to run flows and see them show up in the UI.brian
12/03/2021, 7:07 PMErik Amundson
12/03/2021, 10:26 PMdocker run -v "//.pipe/<named_pipe>://.pipe/<named_pipe>" —rm -it <image>
It also doesn't throw any error when we start the prefect agent with:
prefect agent docker start --label <agent_label> --volume "//.pipe/<named_pipe>://.pipe/<named_pipe>"
but once we try to actually run the workflow it fails with this message:
Internal Server Error ("invalid volume specification: '\\.\pipe\<named_pipe>:\pipe\<named_pipe>:/pipe/<named_pipe>:/pipe/<named_pipe>:rw'")
I'm not really sure why it seems to be duplicating the volume map and removing some of the forward slashes. Is this supported at all? Worst case we can probably subclass the docker agent and hardcode the run_flow()
command but we'd like to avoid having extra code on the agent side.saml
12/03/2021, 10:26 PMFailed to load and execute Flow's environment: ModuleNotFoundError("No module named '/home/ec2-user/'")
Leon Kozlowski
12/03/2021, 11:36 PMregister
step
Registering <flow-name>... Skipped (metadata unchanged)
What does this check take into account? Must I use the --force command?alins
12/04/2021, 8:07 AMimport datetime
import os
import pendulum
import time
from prefect import Flow, task
from prefect.schedules import CronSchedule
from prefect.tasks.airtable import WriteAirtableRow
from prefect.tasks.github import GetRepoInfo
from prefect.triggers import any_failed
@task
def my_process():
time.sleep(70)
daily_schedule = CronSchedule("*/1 * * * *")
with Flow("sleep more than cron", schedule=daily_schedule) as flow:
my_process()
flow.run()
while i run this with python3 flow.py
this run and wait 70 second till end and the task start again (it takes two minutes run every task) but i want it run every minutes and don't care about the time of the process need to finishedTilak Maddy
12/04/2021, 10:49 AMflow.run_config = VertexRun(
image="example/my-custom-private-image:latest",
machine_type="e2-highmem-8",
)
https://docs.prefect.io/orchestration/flow_config/run_configs.html#vertexrunalins
12/04/2021, 12:59 PMprefect server start
it say
ERROR: Couldn't connect to Docker daemon at <http+docker://localhost> - is it running?
but the docker is up and running
could someone help me please ?
the full error
amir@server:~$ prefect server start
ERROR: Couldn't connect to Docker daemon at <http+docker://localhost> - is it running?
If it's at a non-standard location, specify the URL with the DOCKER_HOST environment variable.
Exception caught; killing services (press ctrl-C to force)
ERROR: Couldn't connect to Docker daemon at <http+docker://localhost> - is it running?
If it's at a non-standard location, specify the URL with the DOCKER_HOST environment variable.
Traceback (most recent call last):
File "/home/amir/.local/lib/python3.8/site-packages/prefect/cli/server.py", line 623, in start
subprocess.check_call(
File "/usr/lib/python3.8/subprocess.py", line 364, in check_call
raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['docker-compose', 'pull']' returned non-zero exit status 1.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/amir/.local/bin/prefect", line 8, in <module>
sys.exit(cli())
File "/usr/lib/python3/dist-packages/click/core.py", line 764, in __call__
return self.main(*args, **kwargs)
File "/usr/lib/python3/dist-packages/click/core.py", line 717, in main
rv = self.invoke(ctx)
File "/usr/lib/python3/dist-packages/click/core.py", line 1137, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/lib/python3/dist-packages/click/core.py", line 1137, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/lib/python3/dist-packages/click/core.py", line 956, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/lib/python3/dist-packages/click/core.py", line 555, in invoke
return callback(*args, **kwargs)
File "/home/amir/.local/lib/python3.8/site-packages/prefect/cli/server.py", line 664, in start
subprocess.check_output(
File "/usr/lib/python3.8/subprocess.py", line 415, in check_output
return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
File "/usr/lib/python3.8/subprocess.py", line 516, in run
raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['docker-compose', 'down']' returned non-zero exit status 1.
Oliver Mannion
12/05/2021, 11:56 AMFrank Oplinger
12/05/2021, 5:15 PMTilak Maddy
12/05/2021, 5:43 PMHugo Shi
12/05/2021, 9:26 PMTim Micheletto
12/06/2021, 5:01 AMSnowflakeQuery
task only supports fetch_all
so I have used the snowflake connector directly in my flow. I'm getting some strange issues trying map over the results of the query and distribute them across child tasks with a dask executor. If I try and use fetch_pandas_batches
I get an error indicating the result of that function is not pickleable. Fair enough that is a know limitation, but if use get_result_batches
and call to_pandas
in a mapped task I get some weird behavior. The flow fails with the following error:
Unexpected error: KilledWorker('predict-3815-fc5979b578154850ae381b6b7286993a', <WorkerState '<tcp://10.144.173.27:38363>', name: 4, status: closed, memory: 0, processing: 625>)
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 643, in get_flow_run_state
final_states = executor.wait(
File "/usr/local/lib/python3.8/site-packages/prefect/executors/dask.py", line 440, in wait
return self.client.gather(futures)
File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1977, in gather
return self.sync(
File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 865, in sync
return sync(
File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 327, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 310, in f
result[0] = yield future
File "/usr/local/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1842, in _gather
raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ('predict-3815-fc5979b578154850ae381b6b7286993a', <WorkerState '<tcp://10.144.173.27:38363>', name: 4, status: closed, memory: 0, processing: 625>)
and after 10 mins each mapped child task fails with this error
No heartbeat detected from the remote task; marking the run as failed.
Here's what it looks like.
conn_info = {
"account": "some-company.us-east-1",
"user": "some-user",
"role": "some-role",
"warehouse": "some-warehouse",
"client_session_keep_alive": True
}
@task()
def fetch_org_batches(conn_info: Dict) -> List[Any]:
sa_password = get_secret_string(
"some-key")
conn_info['password'] = sa_password
sql = "select * from invoices_with_statement_and_org_info"
with snowflake.connector.connect(**conn_info) as conn:
with conn.cursor() as cur:
cur.execute(sql)
batches = cur.get_result_batches()
num_result_batches = len(batches)
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"{num_result_batches} batches retrieved")
return batches
@task()
def predict(batch: Any) -> int:
if batch is not None:
df = batch.to_pandas()
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(df['id'])
return len(df.index)
return 0
@task()
def total(x: Any) -> None:
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"{x} passed to total")
<http://logger.info|logger.info>(f"{sum(x)} rows in total")
def main(branch_name: str) -> None:
with Flow(f"bills-invoices-inference-{branch_name}",
executor=DaskExecutor(cluster_class=lambda: make_executor(image, project_pip_url),
adapt_kwargs={"minimum": 1, "maximum": 10})) as flow:
batches = fetch_org_batches(conn_info)
mapped = predict.map(batches)
total(mapped)
flow.storage = S3(bucket="some-bucket")
flow.run_config = KubernetesRun(
env={
"PREFECT__LOGGING__LEVEL": "INFO",
# Install the latest version of this code from the relevant branch on github.
# Allows for faster iteration without having to rebuild the docker image.
"EXTRA_PIP_PACKAGES": f"dask_kubernetes==2021.10.0 {project_pip_url}"
},
image=image,
labels=[], # TODO: variable per branch normal vs sandbox (also need label on agent)
cpu_limit=2,
cpu_request=2,
memory_limit="5Gi"
)
flow.register(project_name="cups")
# flow.run()
if __name__ == "__main__":
branch = sys.argv[1]
main(branch)
Any ideas what's going on here? Is it possible to support this use case currently?Didier Marin
12/06/2021, 9:50 AMJoël Luijmes
12/06/2021, 10:00 AM> prefect run -m src.flows.internal_syncs.firestore_export.flow
Retrieving local flow... Error
Found no flows at src.flows.internal_syncs.firestore_export.flow.
Vadym Dytyniak
12/06/2021, 10:41 AMVadym Dytyniak
12/06/2021, 10:41 AMAnna Geller
12/06/2021, 10:52 AMVadym Dytyniak
12/06/2021, 10:52 AMAnna Geller
12/06/2021, 10:52 AMKevin Kho
12/06/2021, 2:45 PMDefaultFlow
class that contains your defaults.Vadym Dytyniak
12/06/2021, 6:38 PMKevin Kho
12/06/2021, 6:39 PMVadym Dytyniak
12/06/2021, 6:40 PMKevin Kho
12/06/2021, 6:42 PMVadym Dytyniak
12/06/2021, 6:42 PMKevin Kho
12/06/2021, 8:45 PMVadym Dytyniak
12/06/2021, 8:57 PMKevin Kho
12/06/2021, 8:58 PMVadym Dytyniak
12/06/2021, 8:59 PMKevin Kho
12/06/2021, 9:01 PMVadym Dytyniak
12/06/2021, 9:04 PMKevin Kho
12/06/2021, 9:08 PMprefect execute flow
Vadym Dytyniak
12/07/2021, 9:35 AMAnna Geller
12/07/2021, 9:56 AMVadym Dytyniak
12/07/2021, 10:04 AMreturn S3(
bucket=Secret('PREFECT_BUCKET').get(),
key=f'{self.name}/{self.serialized_hash()}',
secrets=['AWS_CREDENTIALS'],
)
Run config:
ECSRun(
image='custom_image',
env={
'PIP_EXTRA_INDEX_URL': os.environ['PIP_EXTRA_INDEX_URL'],
'EXTRA_PIP_PACKAGES': ' '.join(self.dependencies),
},
)
Custom image:
FROM base image
# install tini
ENV TINI_VERSION v0.19.0
ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /tini
RUN chmod +x /tini
ENTRYPOINT ["/tini", "--"]
# install prefect
ENV PREFECT_VERSION 0.15.10
RUN python3.9 -m pip install prefect[aws]==${PREFECT_VERSION}
# update alternatives
RUN update-alternatives --install /usr/bin/python python /usr/bin/python3.9 10
RUN update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.9 10
# prepare entrypoint script
COPY prefect_entrypoint.sh /usr/local/bin/entrypoint.sh
RUN chmod +x /usr/local/bin/entrypoint.sh
CMD ["entrypoint.sh"]
entrypoint.sh copied from pure prefect imageAnna Geller
12/07/2021, 10:11 AM# prepare entrypoint script
COPY prefect_entrypoint.sh /usr/local/bin/entrypoint.sh
RUN chmod +x /usr/local/bin/entrypoint.sh
CMD ["entrypoint.sh"]
Some additional notes:
I usually see the ENV defined with = sign e.g.
ENV PREFECT_VERSION=0.15.10
And then I think this may need to be in quotation marks (not 100% sure):
pip install "prefect[aws]"
If you need some dependencies from this custom image, you can do a multi-stage build, and in the last part use:
FROM prefecthq/prefect:latest
Vadym Dytyniak
12/07/2021, 10:13 AMAnna Geller
12/07/2021, 10:17 AMVadym Dytyniak
12/07/2021, 10:24 AMAnna Geller
12/07/2021, 11:48 AMVadym Dytyniak
12/07/2021, 12:16 PMAnna Geller
12/07/2021, 12:18 PMVadym Dytyniak
12/07/2021, 12:39 PMAnna Geller
12/07/2021, 12:42 PMVadym Dytyniak
12/07/2021, 12:43 PMAnna Geller
12/07/2021, 12:43 PMVadym Dytyniak
12/07/2021, 3:10 PMAnna Geller
12/07/2021, 3:26 PMVadym Dytyniak
12/07/2021, 3:27 PM