Adisun Wheelock
04/22/2020, 4:32 PMasync
tasks.
from prefect import task, Flow
@task
async def extract():
return [1,2,3]
@task
async def transform(some_list):
transformed = [x + 1 for x in some_list]
return transformed
@task
async def load(transformed):
print('load somewhere')
with Flow('async testing') as flow:
extract_nums = extract()
transformed = transform(extract_nums)
load(transformed)
flow.run()
This creates a flow and all, but how do I actually run this flow asynchronously?Philip Blankenau
04/22/2020, 11:42 PMVitor Avancini
04/23/2020, 1:20 PMHugh Cameron
04/28/2020, 11:33 AM~# prefect diagnostics
{
"config_overrides": {},
"env_vars": [
"PREFECT__SERVER__UI__GRAPHQL_URL"
],
"system_information": {
"platform": "Linux-3.10.105-x86_64-with-glibc2.2.5",
"prefect_version": "0.10.4",
"python_version": "3.8.1"
}
}
~# prefect agent start docker --network prefect-server --label NAS --label Docker
____ __ _ _ _
| _ \ _ __ ___ / _| ___ ___| |_ / \ __ _ ___ _ __ | |_
| |_) | '__/ _ \ |_ / _ \/ __| __| / _ \ / _` |/ _ \ '_ \| __|
| __/| | | __/ _| __/ (__| |_ / ___ \ (_| | __/ | | | |_
|_| |_| \___|_| \___|\___|\__| /_/ \_\__, |\___|_| |_|\__|
|___/
[2020-04-28 11:29:17,055] INFO - agent | Starting DockerAgent with labels ['NAS', 'Docker']
[2020-04-28 11:29:17,056] INFO - agent | Agent documentation can be found at <https://docs.prefect.io/orchestration/>
[2020-04-28 11:29:17,056] INFO - agent | Agent connecting to the Prefect API at <http://localhost:4200>
[2020-04-28 11:29:17,078] INFO - agent | Waiting for flow runs...
Any tips to troubleshoot?Jeff Brainerd
04/29/2020, 12:05 PMDavid Ojeda
04/29/2020, 1:47 PMMatias Godoy
04/29/2020, 2:23 PMprefect
python package might be a little overkill.Yufei
05/01/2020, 8:29 PMJosep Consuegra Navarrina
05/11/2020, 3:01 PMRoy Trostyanetski
05/12/2020, 3:54 PMMark Baker
05/18/2020, 6:14 PMRoy Segall
05/21/2020, 4:49 AMRoy Trostyanetski
05/25/2020, 6:18 AMTuan Nguyen
06/12/2020, 8:12 AMAamir Butt
06/19/2020, 10:43 AMPhilip MacMenamin
06/19/2020, 5:39 PMPhilip MacMenamin
06/23/2020, 8:50 PMurllib
, and saves it in $RUN_LOC/my_file
• execs a task which runs a bash command - eg wc -l $RUN_LOC/my_file > file_len
Obviously the tasks here are silly. My aim is to have an example of something which I can have a series of tasks do work on files. The task results are almost not important, the result might be nothing more than a RAN_OK/ NOT_OK. What matters to me is I can have a mechanism to operate on files, and shell out to other utils to operate on files and check their return status.
I've been looking at https://docs.prefect.io/core/concepts/results.html#how-to-configure-task-result-persistence
from prefect import task, Flow
from prefect.engine.results import LocalResult
@task(result=LocalResult(location="initial_data.prefect"))
def root_task():
return [1, 2, 3]
@task(result=LocalResult(location="{date:%A}/{task_name}.prefect"))
def downstream_task(x):
return [i * 10 for i in x]
with Flow("local-results") as flow:
downstream_task(root_task)
itay livni
06/24/2020, 1:28 AMprefect.environments.storage.docker
as the mechanism to build and push containers while using Non-Docker Storage for Containerized Environments? (I ran into a bug deploying this, and wondering if this might be a reason)
flow.storage = S3(
bucket="s3-prefect-flow-storage",
secrets=["AWS_CREDENTIALS"],
)
docker = Docker(
registry_url=ecr_repo_url,
python_dependencies=[
"pandas",...],
dockerfile=docker_flpth,
image_name="annoying_docker",
image_tag="latest",
local_image=True
)
docker.build(push=True)
Peter B
06/28/2020, 2:51 AMS3Upload/S3Download
task for uploading and downloading JSON at each stage. Then I realized that beyond the file size, S3 charges also happen per request (which I was making to check which files had been uploaded at each stage, then downloading files that were not yet processed). So rather than make potentially several thousand download requests to S3, I came up with a task that compresses a bunch of JSON serializable objects and uploads that compressed file to S3 instead. So it prevents the mapped S3Upload/S3Download
and a lot of requests (and saves a lot of $$$).
I'm happy with the workflow so no real problem at hand, but a few questions just out of curiosity:
1. Is there anything anyone has done similar to this?
2. Is there a Prefect task I'm missing that would handle this better?
3. Anyone have a better way of doing this?
Here's the task (which gets passed to an S3Upload
task next in the flow)
@task
def compress_json_serializable_objects(
json_serializable_objects: List[Dict[str, Any]],
object_names: List[str],
compression="xz",
):
if len(json_serializable_objects) != len(object_names):
raise ValueError(
f"json_serializable_objects (len={len(json_serializable_objects)})"
f"and object_names (len={len(object_names)}) not Equal"
)
with NamedTemporaryFile() as tmp:
with tarfile.open(tmp.name, f"w:{compression}") as tf:
for obj, name in zip(json_serializable_objects, object_names):
with closing(BytesIO(json.dumps(obj).encode())) as fobj:
tarinfo = tarfile.TarInfo(name)
tarinfo.size = len(fobj.getvalue())
tf.addfile(tarinfo, fileobj=fobj)
upload_data = Path(tmp.name).read_bytes()
return upload_data
An Hoang
07/07/2020, 2:55 PMjames.lamb
07/20/2020, 3:19 PMflow_group_id
from Prefect Cloud, given a project name and flow name. As far as I understand from this thread, the combination of project name, flow name, and the tenant I'm auth'd as should be enough to uniquely identify a flow_group
. This is the first time I've ever used GraphQL so if anyone has done this or has a better recommendation, I'd welcome it! This was my solution:
from prefect.client import Client
def get_flow_group_id(flow_name, project_name) -> str:
"""
Get the `flow_group_id` for a flow with a given
name, from a given Prefect Cloud project.
"""
client = Client()
query = """
query {
flow(
where: {
name: { _eq: "%s" }
}
) {
id
name
flow_group_id
project_id
}
project(
where: {
name: { _eq: "%s" }
}
) {
id
name
}
}
""" % (flow_name, project_name)
result = client.graphql(query)
project_id = result["data"]["project"][0]["id"]
flow_group_id = [
flow for flow in
result["data"]["flow"]
if flow["project_id"] == project_id
][0]["flow_group_id"]
return flow_group_id
mithalee mohapatra
07/21/2020, 7:15 PMMing Fang
07/27/2020, 1:11 PMAnna Geller (old account)
08/25/2020, 8:29 PMAnna Geller (old account)
09/04/2020, 8:10 PMFlowRunTask
to trigger single flows in order. This helped my team to organize the flows between several layers of jobs in our data warehouse: staging area, business logic, data mart.
I also compare how this can be done in Airflow vs. in Prefect and Prefect version proved to be much simpler.
I hope this may be useful to some of you. Greetings from Berlin! 🙂
https://towardsdatascience.com/managing-dependencies-between-data-pipelines-in-apache-airflow-prefect-f4eba65886dfAvi A
09/09/2020, 8:58 AMJupyterTask
for internal use, and was wondering if anyone would be interested in such a thing before I make the extra effort of publishing it in a packageMichael Ludwig
09/16/2020, 8:53 AMjames.lamb
09/16/2020, 10:18 PMKubernetesAgent
with a few clicks in the Saturn UI
• Author your flow in a Saturn-managed instance of Jupyter Lab
• Register the flow with Prefect Cloud. A library called prefect-saturn
(https://github.com/saturncloud/prefect-saturn) adds an environment and storage to your flow that says "hey, run this on one of my Saturn Dask clusters"
• Whenever your flow runs, it creates a Dask cluster for itself programmatically. All your dependencies, files, and necessary credentials will be available on all of the Dask workers, and all logs and task statuses get sent back to Prefect Cloud.
More details about the architecture and links to sample code are available at https://www.saturncloud.io/docs/connecting/tools/prefect-cloud/.
I want to thank the prefect
team for the care and attention they've given to keeping prefect
modular. We were able to write a client library like this (and another thin wrapper around KubernetesAgent
) without needing to manage a lot of code, because of the clear separation of concerns in the prefect
library. The maintainers have also been very responsive to my questions in this Slack and to my proposals in issues and pull requests. ❤️Jackson Maxfield Brown
09/18/2020, 7:07 PMResult
and Serializer
classes and so we are already thinking about how to upgrade those and have written quite a bit on potential implementation details so great job Prefect team for getting to it so quickly 🎉