Joey Kellison-Linn
05/14/2021, 4:01 PMCarlo
05/14/2021, 7:11 PMEcsAgent
from FargateAgent
I'm specifying the task_definition_path
as an <s3://url>
.
It's returning a 404 error. Using the same credentials, I'm able to write my own boto3 code to access the file successfully. Any ideas?Ian Harvey
05/15/2021, 9:00 AMLionel
05/16/2021, 8:34 AMGITHUB_ACCESS_TOKEN
to use GitHub as the script storage backend.Matt Camp
05/17/2021, 1:35 AM# debug mode
debug = true
# base configuration directory (typically you won't change this!)
home_dir = "~/.prefect/"
backend = "server"
[server]
host = "<http://apollo.prefect.campground.lan>"
port = "4200"
endpoint = "${server.host}:${server.port}"
my apollo server is at <http://apollo.prefect.campground.lan:4200/>
my graphl server is at <http://graphql.prefect.campground.lan:4200/graphql>
my ui is at <http://prefect.campground.lan>
Any glaring problems with what I've done?
TIALionel
05/17/2021, 4:08 PMmy_query = 'SELECT * FROM my_db LIMIT 10'
MySQLFetch("my_db ", "user_read_only", "Abcd1234", "mysql.test.domain", port=3306, query=my_query)
Ismail Cenik
05/17/2021, 7:38 PMTom Forbes
05/17/2021, 8:36 PM@task()
def run_export():
prefix = trigger_export()
return dask.dataframe.read_parquet(f'{prefix}/*.parquet')
@task()
def download_image(uuid):
s3 = get_boto_client("s3")
img = imageio.imread(s3.get_object(Bucket="..", Key=f"{uuid}")["Body"])
return skimage.resize(img, (1024, 1024))
@task()
def save_to_s3(result_df):
result_df.save("<s3://bucket/output_prefix/>")
with Flow("something") as flow:
df = run_export()
images_df = download_images.map(df["uuid"])
save_to_s3(images_df)
Basically I want to grab a dataframe from somewhere, download a bunch of images from S3, resize them, and attach them to the dataframe as a new column, then save the frame somewhere else to S3.Tom Forbes
05/17/2021, 8:41 PMdf.apply
directly - is that what you would do? We’d rather use Prefect for this directly if possible. But I’m not clear on how Prefect works with Dask - would a mapping task like this be the way to go? Would this scale to a high number of tasks (millions?), or would you perhaps map over the dask partitions instead?Daniel Davee
05/18/2021, 1:23 AMLionel
05/18/2021, 6:02 AMRehan Razzaque Rajput
05/18/2021, 8:21 AMMaria
05/18/2021, 12:45 PMNitin Karolla
05/18/2021, 2:11 PMIsmail Cenik
05/18/2021, 3:56 PMMdu Keswa
05/18/2021, 6:31 PMMdu Keswa
05/18/2021, 6:31 PMMdu Keswa
05/18/2021, 7:16 PMKarl
05/19/2021, 12:41 AMMichael Hadorn
05/19/2021, 1:17 PMMichael Hadorn
05/19/2021, 3:35 PMMay 19 17:30:35 <http://XXX.ch|XXX.ch> run_dev_docker_agent.sh[3306015]: requests.exceptions.ConnectionError: HTTPConnectionPool(host='host.docker.internal', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f8572e5ecd0>: Failed to establish a new connection: [Errno -2] Name or service not known'))
(-> full log in the thread.)
Before (<=0.14.17) everything was working.
On Windows it's working also with .19.
But not on:
Ubuntu 20.04
Docker version 20.10.3, build 48d30b5
docker-compose version 1.28.2, build unknown
Do you get some similar behaviours?Daniel Davee
05/19/2021, 9:14 PMDaniel Davee
05/19/2021, 9:15 PMDaniel Davee
05/19/2021, 9:16 PMDean Magee
05/19/2021, 11:57 PMdef alert_failed(obj, old_state, new_state):
if new_state.is_failed():
myTeamsMessage = pymsteams.connectorcard(os.getenv("MSTEAMS_WEBHOOK_URL"))
myTeamsMessage.title("An error has occurred!")
Im trying to grab the text that Python spits out as an error message and include that in my MS Teams message. Any idea how to do that?jaehoon
05/20/2021, 1:30 PMRomain
05/20/2021, 2:01 PM/A/
B/
flows/
my_flow.py
in my_flow.py, imagine something like that:
def get_flow():
with Flow('my_flow', storage=Module("B.flows.my_flow:get_flow")) as flow:
....
return flow
In the dockerfile, I ensure that the PYTHONPATH
env var holds the folder A
so that I can import <http://B.flows.my|B.flows.my>_flow
setting this at the end of the DockerFile:
ENV PYTHONPATH "${PYTHONPATH}:/A"
After that I register this flow with a KubernetesRun
and a `DaskExecutor`:
flow.run_config = KubernetesRun()
flow.executor = DaskExecutor()
flow.register(project_name='my_project',
idempotency_key=flow.serialized_hash())
Then from a prefect server, I trigger the flow run, but I get the following error:
Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'A'")
I don't really get why my module is not found.
I have tested a local deployment using docker-compose with a local agent (running in the compose stack), and it was working fine. So I am missing something here.
Any ideas?Daniel Davee
05/20/2021, 4:08 PMimage: prefecthq/prefect:latest
on to GKE, can prefect be ran on GKEDavid Glaister
05/20/2021, 4:49 PMAustin Mackillop
05/20/2021, 6:44 PM